Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 128 additions & 0 deletions plugins/out_opentelemetry/opentelemetry.c
Original file line number Diff line number Diff line change
Expand Up @@ -1080,6 +1080,112 @@ static int append_v1_logs_metadata(struct opentelemetry_context *ctx,
return 0;
}

static int append_v1_logs_message(struct opentelemetry_context *ctx,
struct flb_log_event *event,
Opentelemetry__Proto__Logs__V1__LogRecord *log_record)
{
struct flb_ra_value *ra_val;

if (ctx == NULL || event == NULL || log_record == NULL) {
return -1;
}

/* SeverityText */
if (ctx->ra_severity_text_message) {
ra_val = flb_ra_get_value_object(ctx->ra_severity_text_message, *event->body);
if (ra_val != NULL && ra_val->o.type == MSGPACK_OBJECT_STR) {
if(is_valid_severity_text(ra_val->o.via.str.ptr, ra_val->o.via.str.size) == FLB_TRUE){
log_record->severity_text = flb_calloc(1, ra_val->o.via.str.size+1);
if (log_record->severity_text) {
strncpy(log_record->severity_text, ra_val->o.via.str.ptr, ra_val->o.via.str.size);
}
flb_ra_key_value_destroy(ra_val);
}else{
flb_plg_warn(ctx->ins, "Unable to process %s. Invalid Severity Text.\n", ctx->ra_severity_text_message->pattern);
log_record->severity_text = NULL;
}
}
else {
/* To prevent invalid free */
log_record->severity_text = NULL;
}
}

/* SeverityNumber */
if (ctx->ra_severity_number_message) {
ra_val = flb_ra_get_value_object(ctx->ra_severity_number_metadata, *event->body);
if (ra_val != NULL && ra_val->o.type == MSGPACK_OBJECT_POSITIVE_INTEGER &&
is_valid_severity_number(ra_val->o.via.u64) == FLB_TRUE) {
log_record->severity_number = ra_val->o.via.u64;
flb_ra_key_value_destroy(ra_val);
}
}else if(ctx->ra_severity_text_message){
//TODO get sev number based off sev text
}

/* SpanId */
if (ctx->ra_span_id_message) {
ra_val = flb_ra_get_value_object(ctx->ra_span_id_message, *event->body);
if (ra_val != NULL) {
if(ra_val->o.type == MSGPACK_OBJECT_BIN){
log_record->span_id.data = flb_calloc(1, ra_val->o.via.bin.size);
if (log_record->span_id.data) {
memcpy(log_record->span_id.data, ra_val->o.via.bin.ptr, ra_val->o.via.bin.size);
log_record->span_id.len = ra_val->o.via.bin.size;
}
}else if(ra_val->o.type == MSGPACK_OBJECT_STR){
log_record->span_id.data = flb_calloc(8, sizeof(uint8_t));
if (log_record->span_id.data) {
// Convert to a byte array
uint8_t val[8];
size_t count;
for(count = 0; count < sizeof val/sizeof *val; count++ ){
sscanf(ra_val->o.via.str.ptr, "%2hhx", &val[count]);
ra_val->o.via.str.ptr+=2;
}
memcpy(log_record->span_id.data, val, sizeof(val));
log_record->span_id.len = sizeof(val);
}
}else{
flb_plg_warn(ctx->ins, "Unable to process %s. Unsupported data type.\n", ctx->ra_span_id_message->pattern);
}
flb_ra_key_value_destroy(ra_val);
}
}

/* TraceId */
if (ctx->ra_trace_id_message) {
ra_val = flb_ra_get_value_object(ctx->ra_trace_id_message, *event->body);
if (ra_val != NULL) {
if(ra_val->o.type == MSGPACK_OBJECT_BIN){
log_record->trace_id.data = flb_calloc(1, ra_val->o.via.bin.size);
if (log_record->trace_id.data) {
memcpy(log_record->trace_id.data, ra_val->o.via.bin.ptr, ra_val->o.via.bin.size);
log_record->trace_id.len = ra_val->o.via.bin.size;
}
}else if(ra_val->o.type == MSGPACK_OBJECT_STR){
log_record->trace_id.data = flb_calloc(16, sizeof(uint8_t));
if (log_record->trace_id.data) {
// Convert from hexdec string to a 16 byte array
uint8_t val[16];
size_t count;
for(count = 0; count < sizeof val/sizeof *val; count++ ){
sscanf(ra_val->o.via.str.ptr, "%2hhx", &val[count]);
ra_val->o.via.str.ptr+=2;
}
memcpy(log_record->trace_id.data, val, sizeof(val));
log_record->trace_id.len = sizeof(val);
}
}else{
flb_plg_warn(ctx->ins, "Unable to process %s. Unsupported data type.\n", ctx->ra_trace_id_message->pattern);
}
flb_ra_key_value_destroy(ra_val);
}
}

return 0;
}

static int process_logs(struct flb_event_chunk *event_chunk,
struct flb_output_flush *out_flush,
struct flb_input_instance *ins, void *out_context,
Expand Down Expand Up @@ -1158,6 +1264,8 @@ static int process_logs(struct flb_event_chunk *event_chunk,

append_v1_logs_metadata(ctx, &event, &log_records[log_record_count]);

append_v1_logs_message(ctx, &event, &log_records[log_record_count]);

ret = FLB_OK;

log_records[log_record_count].time_unix_nano = flb_time_to_nanosec(&event.timestamp);
Expand Down Expand Up @@ -1544,6 +1652,26 @@ static struct flb_config_map config_map[] = {
0, FLB_TRUE, offsetof(struct opentelemetry_context, logs_resource_metadata_key),
"Specify a Resource key"
},
{
FLB_CONFIG_MAP_STR, "logs_span_id_message_key", "$SpanId",
0, FLB_TRUE, offsetof(struct opentelemetry_context, logs_span_id_message_key),
"Specify a SpanId key"
},
{
FLB_CONFIG_MAP_STR, "logs_trace_id_message_key", "$TraceId",
0, FLB_TRUE, offsetof(struct opentelemetry_context, logs_trace_id_message_key),
"Specify a TraceId key"
},
{
FLB_CONFIG_MAP_STR, "logs_severity_text_message_key", "$SeverityText",
0, FLB_TRUE, offsetof(struct opentelemetry_context, logs_severity_text_message_key),
"Specify a Severity Text key"
},
{
FLB_CONFIG_MAP_STR, "logs_severity_number_message_key", "$SeverityNumber",
0, FLB_TRUE, offsetof(struct opentelemetry_context, logs_severity_number_message_key),
"Specify a Severity Number key"
},

/* EOF */
{0}
Expand Down
13 changes: 13 additions & 0 deletions plugins/out_opentelemetry/opentelemetry.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,19 @@ struct opentelemetry_context {
flb_sds_t logs_instrumentation_scope_metadata_key;
flb_sds_t logs_resource_metadata_key;

/* otel body keys */
flb_sds_t logs_span_id_message_key;
struct flb_record_accessor *ra_span_id_message;

flb_sds_t logs_trace_id_message_key;
struct flb_record_accessor *ra_trace_id_message;

flb_sds_t logs_severity_text_message_key;
struct flb_record_accessor *ra_severity_text_message;

flb_sds_t logs_severity_number_message_key;
struct flb_record_accessor *ra_severity_number_message;

/* Number of logs to flush at a time */
int batch_size;

Expand Down
32 changes: 32 additions & 0 deletions plugins/out_opentelemetry/opentelemetry_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,26 @@ struct opentelemetry_context *flb_opentelemetry_context_create(struct flb_output
if (ctx->ra_attributes_metadata == NULL) {
flb_plg_error(ins, "failed to create ra for attributes");
}
ctx->ra_span_id_message = flb_ra_create((char*)ctx->logs_span_id_message_key,
FLB_FALSE);
if (ctx->ra_span_id_message == NULL) {
flb_plg_error(ins, "failed to create ra for message span id");
}
ctx->ra_trace_id_message = flb_ra_create((char*)ctx->logs_trace_id_message_key,
FLB_FALSE);
if (ctx->ra_trace_id_message == NULL) {
flb_plg_error(ins, "failed to create ra for message trace id");
}
ctx->ra_severity_text_message = flb_ra_create((char*)ctx->logs_severity_text_message_key,
FLB_FALSE);
if (ctx->ra_severity_text_message == NULL) {
flb_plg_error(ins, "failed to create ra for message severity text");
}
ctx->ra_severity_number_message = flb_ra_create((char*)ctx->logs_severity_number_message_key,
FLB_FALSE);
if (ctx->ra_severity_number_message == NULL) {
flb_plg_error(ins, "failed to create ra for message severity number");
}

return ctx;
}
Expand Down Expand Up @@ -466,6 +486,18 @@ void flb_opentelemetry_context_destroy(struct opentelemetry_context *ctx)
if (ctx->ra_attributes_metadata) {
flb_ra_destroy(ctx->ra_attributes_metadata);
}
if (ctx->ra_span_id_message) {
flb_ra_destroy(ctx->ra_span_id_message);
}
if (ctx->ra_trace_id_message) {
flb_ra_destroy(ctx->ra_trace_id_message);
}
if (ctx->ra_severity_text_message) {
flb_ra_destroy(ctx->ra_severity_text_message);
}
if (ctx->ra_severity_number_message) {
flb_ra_destroy(ctx->ra_severity_number_message);
}

flb_free(ctx->proxy_host);
flb_free(ctx);
Expand Down