Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
263 changes: 228 additions & 35 deletions plugins/out_opentelemetry/opentelemetry_logs.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,52 @@
#include "opentelemetry_conf.h"
#include "opentelemetry_utils.h"

static int hex_to_int(char ch)
{
if (ch >= '0' && ch <= '9') {
return ch - '0';
}

if (ch >= 'a' && ch <= 'f') {
return ch - 'a' + 10;
}

if (ch >= 'A' && ch <= 'F') {
return ch - 'A' + 10;
}

return -1;
}

/* convert an hex string to the expected id (16 bytes) */
static int hex_to_id(char *str, int len, unsigned char *out_buf, int out_size)
{
int i;
int high;
int low;

if (len % 2 != 0) {
return -1;
}

for (i = 0; i < len; i += 2) {
if (!isxdigit(str[i]) || !isxdigit(str[i + 1])) {
return -1;
}

high = hex_to_int(str[i]);
low = hex_to_int(str[i + 1]);

if (high == -1 || low == -1) {
return -1;
}

out_buf[i / 2] = (high << 4) | low;
}

return 0;
}

/* https://opentelemetry.io/docs/specs/otel/logs/data-model/#field-severitynumber */
static int is_valid_severity_text(const char *str, size_t str_len)
{
Expand Down Expand Up @@ -281,10 +327,94 @@ static int log_record_set_attributes(struct opentelemetry_context *ctx,
return 0;
}

static int append_v1_logs_metadata(struct opentelemetry_context *ctx,
struct flb_log_event *event,
Opentelemetry__Proto__Logs__V1__LogRecord *log_record)
static int pack_trace_id(struct opentelemetry_context *ctx,
Opentelemetry__Proto__Logs__V1__LogRecord *log_record,
struct flb_ra_value *ra_val)
{
int ret;

if (ra_val->o.type == MSGPACK_OBJECT_BIN) {
log_record->trace_id.data = flb_calloc(1, ra_val->o.via.bin.size);
if (!log_record->trace_id.data) {
return -1;
}
memcpy(log_record->trace_id.data, ra_val->o.via.bin.ptr, ra_val->o.via.bin.size);
log_record->trace_id.len = ra_val->o.via.bin.size;
}
else if (ra_val->o.type == MSGPACK_OBJECT_STR) {
if (ra_val->o.via.str.size > 32) {
return -1;
}

log_record->trace_id.data = flb_calloc(1, 16);
if (!log_record->trace_id.data) {
flb_errno();
return -1;
}

ret = hex_to_id((char *) ra_val->o.via.str.ptr, ra_val->o.via.str.size,
log_record->trace_id.data, 16);
if (ret == 0) {
log_record->trace_id.len = 16;
return 0;
}

flb_plg_warn(ctx->ins, "invalid trace_id format");
flb_free(log_record->trace_id.data);
log_record->trace_id.data = NULL;
log_record->trace_id.len = 0;
}
else {
flb_plg_warn(ctx->ins, "invalid trace_id type");
}

return -1;
}

static int pack_span_id(struct opentelemetry_context *ctx,
Opentelemetry__Proto__Logs__V1__LogRecord *log_record,
struct flb_ra_value *ra_val)
{
if (ra_val->o.type == MSGPACK_OBJECT_BIN) {
log_record->span_id.data = flb_calloc(1, ra_val->o.via.bin.size);
if (!log_record->span_id.data) {
return -1;
}
memcpy(log_record->span_id.data, ra_val->o.via.bin.ptr, ra_val->o.via.bin.size);
log_record->span_id.len = ra_val->o.via.bin.size;
}
else if (ra_val->o.type == MSGPACK_OBJECT_STR) {
if (ra_val->o.via.str.size > 16) {
return -1;
}

log_record->span_id.data = flb_calloc(1, 8);
if (!log_record->span_id.data) {
flb_errno();
return -1;
}

hex_to_id((char *) ra_val->o.via.str.ptr, ra_val->o.via.str.size,
log_record->span_id.data, 8);
log_record->span_id.len = 8;
}
else {
flb_plg_warn(ctx->ins, "invalid span_id type");
}

return 0;
}

static int append_v1_logs_metadata_and_fields(struct opentelemetry_context *ctx,
struct flb_log_event *event,
Opentelemetry__Proto__Logs__V1__LogRecord *log_record)
{
int ret;
int span_id_set = FLB_FALSE;
int trace_id_set = FLB_FALSE;
int severity_text_set = FLB_FALSE;
int severity_number_set = FLB_FALSE;
int trace_flags_set = FLB_FALSE;
struct flb_ra_value *ra_val;

if (ctx == NULL || event == NULL || log_record == NULL) {
Expand Down Expand Up @@ -335,14 +465,28 @@ static int append_v1_logs_metadata(struct opentelemetry_context *ctx,
if (ra_val != NULL) {
if (ra_val->o.type == MSGPACK_OBJECT_POSITIVE_INTEGER && is_valid_severity_number(ra_val->o.via.u64)) {
log_record->severity_number = ra_val->o.via.u64;
severity_number_set = FLB_TRUE;
}
flb_ra_key_value_destroy(ra_val);
}
else if (ctx->ra_severity_number_metadata) {

if (!severity_number_set && ctx->ra_severity_number_metadata) {
ra_val = flb_ra_get_value_object(ctx->ra_severity_number_metadata, *event->metadata);
if (ra_val != NULL) {
if (ra_val->o.type == MSGPACK_OBJECT_POSITIVE_INTEGER && is_valid_severity_number(ra_val->o.via.u64)) {
log_record->severity_number = ra_val->o.via.u64;
severity_number_set = FLB_TRUE;
}
flb_ra_key_value_destroy(ra_val);
}
}

if (!severity_number_set && ctx->ra_severity_number_message) {
ra_val = flb_ra_get_value_object(ctx->ra_severity_number_metadata, *event->body);
if (ra_val != NULL) {
if (ra_val->o.type == MSGPACK_OBJECT_POSITIVE_INTEGER && is_valid_severity_number(ra_val->o.via.u64)) {
log_record->severity_number = ra_val->o.via.u64;
severity_number_set = FLB_TRUE;
}
flb_ra_key_value_destroy(ra_val);
}
Expand All @@ -357,24 +501,43 @@ static int append_v1_logs_metadata(struct opentelemetry_context *ctx,
log_record->severity_text = flb_calloc(1, ra_val->o.via.str.size + 1);
if (log_record->severity_text) {
strncpy(log_record->severity_text, ra_val->o.via.str.ptr, ra_val->o.via.str.size);
severity_text_set = FLB_TRUE;
}
}
flb_ra_key_value_destroy(ra_val);
}
else if (ctx->ra_severity_text_metadata) {

if (!severity_text_set && ctx->ra_severity_text_metadata) {
ra_val = flb_ra_get_value_object(ctx->ra_severity_text_metadata, *event->metadata);
if (ra_val != NULL) {
if (ra_val->o.type == MSGPACK_OBJECT_STR &&
is_valid_severity_text(ra_val->o.via.str.ptr, ra_val->o.via.str.size)) {
log_record->severity_text = flb_calloc(1, ra_val->o.via.str.size + 1);
if (log_record->severity_text) {
strncpy(log_record->severity_text, ra_val->o.via.str.ptr, ra_val->o.via.str.size);
severity_text_set = FLB_TRUE;
}
}
flb_ra_key_value_destroy(ra_val);
}
}
else {

if (!severity_text_set && ctx->ra_severity_text_message) {
ra_val = flb_ra_get_value_object(ctx->ra_severity_text_message, *event->body);
if (ra_val != NULL) {
if (ra_val->o.type == MSGPACK_OBJECT_STR &&
is_valid_severity_text(ra_val->o.via.str.ptr, ra_val->o.via.str.size)) {
log_record->severity_text = flb_calloc(1, ra_val->o.via.str.size + 1);
if (log_record->severity_text) {
strncpy(log_record->severity_text, ra_val->o.via.str.ptr, ra_val->o.via.str.size);
severity_text_set = FLB_TRUE;
}
}
flb_ra_key_value_destroy(ra_val);
}
}

if (!severity_text_set) {
/* To prevent invalid free */
log_record->severity_text = NULL;
}
Expand Down Expand Up @@ -406,47 +569,62 @@ static int append_v1_logs_metadata(struct opentelemetry_context *ctx,
/* TraceId */
ra_val = flb_ra_get_value_object(ctx->ra_log_meta_otlp_trace_id, *event->metadata);
if (ra_val != NULL) {
if (ra_val->o.type == MSGPACK_OBJECT_BIN) {
log_record->trace_id.data = flb_calloc(1, ra_val->o.via.bin.size);
if (log_record->trace_id.data) {
memcpy(log_record->trace_id.data, ra_val->o.via.bin.ptr, ra_val->o.via.bin.size);
log_record->trace_id.len = ra_val->o.via.bin.size;
}
ret = pack_trace_id(ctx, log_record, ra_val);
if (ret == 0) {
trace_id_set = FLB_TRUE;
}
flb_ra_key_value_destroy(ra_val);
}
else if (ctx->ra_trace_id_metadata) {

if (!trace_id_set && ctx->ra_trace_id_metadata) {
ra_val = flb_ra_get_value_object(ctx->ra_trace_id_metadata, *event->metadata);
if (ra_val != NULL && ra_val->o.type == MSGPACK_OBJECT_BIN) {
log_record->trace_id.data = flb_calloc(1, ra_val->o.via.bin.size);
if (log_record->trace_id.data) {
memcpy(log_record->trace_id.data, ra_val->o.via.bin.ptr, ra_val->o.via.bin.size);
log_record->trace_id.len = ra_val->o.via.bin.size;
if (ra_val != NULL) {
ret = pack_trace_id(ctx, log_record, ra_val);
if (ret == 0) {
trace_id_set = FLB_TRUE;
}
flb_ra_key_value_destroy(ra_val);
}
}

if (!trace_id_set && ctx->ra_trace_id_message) {
ra_val = flb_ra_get_value_object(ctx->ra_trace_id_message, *event->body);
if (ra_val != NULL) {
ret = pack_trace_id(ctx, log_record, ra_val);
if (ret == 0) {
trace_id_set = FLB_TRUE;
}
flb_ra_key_value_destroy(ra_val);
}
}

/* SpanId */
ra_val = flb_ra_get_value_object(ctx->ra_log_meta_otlp_span_id, *event->metadata);
if (ra_val != NULL) {
if (ra_val->o.type == MSGPACK_OBJECT_BIN) {
log_record->span_id.data = flb_calloc(1, ra_val->o.via.bin.size);
if (log_record->span_id.data) {
memcpy(log_record->span_id.data, ra_val->o.via.bin.ptr, ra_val->o.via.bin.size);
log_record->span_id.len = ra_val->o.via.bin.size;
}
ret = pack_span_id(ctx, log_record, ra_val);
if (ret == 0) {
span_id_set = FLB_TRUE;
}
flb_ra_key_value_destroy(ra_val);
}
else if (ctx->ra_span_id_metadata) {

if (!span_id_set && ctx->ra_span_id_metadata) {
ra_val = flb_ra_get_value_object(ctx->ra_span_id_metadata, *event->metadata);
if (ra_val != NULL) {
if (ra_val->o.type == MSGPACK_OBJECT_BIN) {
log_record->span_id.data = flb_calloc(1, ra_val->o.via.bin.size);
if (log_record->span_id.data) {
memcpy(log_record->span_id.data, ra_val->o.via.bin.ptr, ra_val->o.via.bin.size);
log_record->span_id.len = ra_val->o.via.bin.size;
}
ret = pack_span_id(ctx, log_record, ra_val);
if (ret == 0) {
span_id_set = FLB_TRUE;
}
flb_ra_key_value_destroy(ra_val);
}
}

if (!span_id_set && ctx->ra_span_id_message) {
ra_val = flb_ra_get_value_object(ctx->ra_span_id_message, *event->body);
if (ra_val != NULL) {
ret = pack_span_id(ctx, log_record, ra_val);
if (ret == 0) {
span_id_set = FLB_TRUE;
}
flb_ra_key_value_destroy(ra_val);
}
Expand All @@ -457,14 +635,17 @@ static int append_v1_logs_metadata(struct opentelemetry_context *ctx,
if (ra_val != NULL) {
if (ra_val->o.type == MSGPACK_OBJECT_POSITIVE_INTEGER) {
log_record->flags = (uint32_t) ra_val->o.via.u64;
trace_flags_set = FLB_TRUE;
}
flb_ra_key_value_destroy(ra_val);
}
else if (ctx->ra_trace_flags_metadata) {

if (!trace_flags_set && ctx->ra_trace_flags_metadata) {
ra_val = flb_ra_get_value_object(ctx->ra_trace_flags_metadata, *event->metadata);
if (ra_val != NULL) {
if (ra_val->o.type == MSGPACK_OBJECT_POSITIVE_INTEGER) {
log_record->flags = (uint32_t) ra_val->o.via.u64;
trace_flags_set = FLB_TRUE;
}
flb_ra_key_value_destroy(ra_val);
}
Expand Down Expand Up @@ -740,6 +921,7 @@ int otel_process_logs(struct flb_event_chunk *event_chunk,
int log_record_count;
int max_scopes;
int max_resources;
int native_otel = FLB_FALSE;
int64_t prev_group_resource_id = -1;
int64_t prev_group_scope_id = -1;
int64_t resource_id = -1;
Expand Down Expand Up @@ -807,6 +989,9 @@ int otel_process_logs(struct flb_event_chunk *event_chunk,
continue;
}

/* flag this as a native otel schema */
native_otel = FLB_TRUE;

if (resource_id == -1 && prev_group_resource_id >= 0 && prev_group_resource_id == tmp_resource_id) {
/* continue with the previous resource */
resource_id = prev_group_resource_id;
Expand Down Expand Up @@ -920,8 +1105,14 @@ int otel_process_logs(struct flb_event_chunk *event_chunk,

ret = FLB_OK;

/* since we are starting a new group, just continue with the next record */
continue;
/*
* if we started a new group through a valid OTLP schema, just continue since the active record
* is a group start. If native_otel is off it means the packaging was done for a record which is
* not OTLP schema compatible so it needs to be processed (do not skip it).
*/
if (native_otel) {
continue;
}
}
else if (record_type == FLB_LOG_EVENT_GROUP_END) {
/* do nothing */
Expand All @@ -930,6 +1121,8 @@ int otel_process_logs(struct flb_event_chunk *event_chunk,
prev_group_scope_id = scope_id;
resource_id = -1;
scope_id = -1;
native_otel = FLB_FALSE;

continue;
}

Expand Down Expand Up @@ -981,7 +1174,7 @@ int otel_process_logs(struct flb_event_chunk *event_chunk,
break;
}

append_v1_logs_metadata(ctx, &event, log_record);
append_v1_logs_metadata_and_fields(ctx, &event, log_record);

ret = FLB_OK;
log_record_count++;
Expand Down