Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 143 additions & 8 deletions plugins/in_opentelemetry/opentelemetry_prot.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <fluent-bit/flb_time.h>
#include <fluent-bit/flb_gzip.h>
#include <fluent-bit/flb_snappy.h>
#include <fluent-bit/flb_mp.h>
#include <fluent-bit/flb_log_event_encoder.h>

#include <monkey/monkey.h>
Expand Down Expand Up @@ -386,13 +387,145 @@ static int otlp_pack_any_value(msgpack_packer *mp_pck,
return result;
}

/* https://opentelemetry.io/docs/specs/otel/logs/data-model/#log-and-event-record-definition */
static int otel_pack_v1_metadata(msgpack_packer *mp_pck,
struct Opentelemetry__Proto__Logs__V1__LogRecord *log_record,
Opentelemetry__Proto__Resource__V1__Resource *resource,
Opentelemetry__Proto__Common__V1__InstrumentationScope *scope)
{
struct flb_mp_map_header mh;
struct flb_mp_map_header scope_mh;
int ret;
flb_mp_map_header_init(&mh, mp_pck);

flb_mp_map_header_append(&mh);
msgpack_pack_str(mp_pck, 17);
msgpack_pack_str_body(mp_pck, "ObservedTimestamp", 17);
msgpack_pack_uint64(mp_pck, log_record->observed_time_unix_nano);

/* Value of 0 indicates unknown or missing timestamp. */
if (log_record->time_unix_nano != 0) {
flb_mp_map_header_append(&mh);
msgpack_pack_str(mp_pck, 9);
msgpack_pack_str_body(mp_pck, "Timestamp", 9);
msgpack_pack_uint64(mp_pck, log_record->time_unix_nano);
}

/* https://opentelemetry.io/docs/specs/otel/logs/data-model/#field-severitynumber */
if (log_record->severity_number >= 1 && log_record->severity_number <= 24) {
flb_mp_map_header_append(&mh);
msgpack_pack_str(mp_pck, 14);
msgpack_pack_str_body(mp_pck, "SeverityNumber", 14);
msgpack_pack_uint64(mp_pck, log_record->severity_number);
}

if (log_record->severity_text != NULL && strlen(log_record->severity_text) > 0) {
flb_mp_map_header_append(&mh);
msgpack_pack_str(mp_pck, 12);
msgpack_pack_str_body(mp_pck, "SeverityText", 12);
msgpack_pack_str(mp_pck, strlen(log_record->severity_text));
msgpack_pack_str_body(mp_pck, log_record->severity_text, strlen(log_record->severity_text));
}

if (log_record->n_attributes > 0) {
flb_mp_map_header_append(&mh);
msgpack_pack_str(mp_pck, 10);
msgpack_pack_str_body(mp_pck, "Attributes", 10);
ret = otel_pack_kvarray(mp_pck,
log_record->attributes,
log_record->n_attributes);
if (ret != 0) {
return ret;
}
}

if (log_record->trace_id.len > 0) {
flb_mp_map_header_append(&mh);
msgpack_pack_str(mp_pck, 7);
msgpack_pack_str_body(mp_pck, "TraceId", 7);
ret = otel_pack_bytes(mp_pck, log_record->trace_id);
if (ret != 0) {
return ret;
}
}

if (log_record->span_id.len > 0) {
flb_mp_map_header_append(&mh);
msgpack_pack_str(mp_pck, 6);
msgpack_pack_str_body(mp_pck, "SpanId", 6);
ret = otel_pack_bytes(mp_pck, log_record->span_id);
if (ret != 0) {
return ret;
}
}

flb_mp_map_header_append(&mh);
msgpack_pack_str(mp_pck, 10);
msgpack_pack_str_body(mp_pck, "TraceFlags", 10);
msgpack_pack_uint8(mp_pck, (uint8_t)log_record->flags & 0xff);



if (resource != NULL && resource->n_attributes > 0 && resource->attributes) {
flb_mp_map_header_append(&mh);
msgpack_pack_str(mp_pck, 8);
msgpack_pack_str_body(mp_pck, "Resource", 8);

ret = otel_pack_kvarray(mp_pck,
resource->attributes,
resource->n_attributes);
if (ret != 0) {
return ret;
}
}

if (scope != NULL && (scope->name || scope->version || scope->n_attributes > 0)) {
flb_mp_map_header_append(&mh);
msgpack_pack_str(mp_pck, 20);
msgpack_pack_str_body(mp_pck, "InstrumentationScope", 20);

flb_mp_map_header_init(&scope_mh, mp_pck);
if (scope->name != NULL && strlen(scope->name) > 0) {
flb_mp_map_header_append(&scope_mh);
msgpack_pack_str(mp_pck, 4);
msgpack_pack_str_body(mp_pck, "Name", 4);
msgpack_pack_str(mp_pck, strlen(scope->name));
msgpack_pack_str_body(mp_pck, scope->name, strlen(scope->name));
}
if (scope->version != NULL && strlen(scope->version) > 0) {
flb_mp_map_header_append(&scope_mh);
msgpack_pack_str(mp_pck, 7);
msgpack_pack_str_body(mp_pck, "Version", 7);
msgpack_pack_str(mp_pck, strlen(scope->version));
msgpack_pack_str_body(mp_pck, scope->version, strlen(scope->version));
}
if (scope->n_attributes > 0 && scope->attributes) {
msgpack_pack_str(mp_pck, 10);
msgpack_pack_str_body(mp_pck, "Attributes", 10);
ret = otel_pack_kvarray(mp_pck,
scope->attributes,
scope->n_attributes);
if (ret != 0) {
return ret;
}
}

flb_mp_map_header_end(&scope_mh);
}

flb_mp_map_header_end(&mh);
return 0;
}

static int binary_payload_to_msgpack(struct flb_log_event_encoder *encoder,
uint8_t *in_buf,
size_t in_size)
{
int ret;
msgpack_packer packer;
msgpack_sbuffer buffer;
msgpack_packer meta_packer;
msgpack_sbuffer meta_buffer;
int resource_logs_index;
int scope_log_index;
int log_record_index;
Expand All @@ -403,9 +536,12 @@ static int binary_payload_to_msgpack(struct flb_log_event_encoder *encoder,
Opentelemetry__Proto__Logs__V1__ResourceLogs **resource_logs;
Opentelemetry__Proto__Logs__V1__ResourceLogs *resource_log;
Opentelemetry__Proto__Logs__V1__LogRecord **log_records;
Opentelemetry__Proto__Resource__V1__Resource *resource;

msgpack_sbuffer_init(&buffer);
msgpack_packer_init(&packer, &buffer, msgpack_sbuffer_write);
msgpack_sbuffer_init(&meta_buffer);
msgpack_packer_init(&meta_packer, &meta_buffer, msgpack_sbuffer_write);

input_logs = opentelemetry__proto__collector__logs__v1__export_logs_service_request__unpack(NULL, in_size, in_buf);
if (input_logs == NULL) {
Expand All @@ -423,6 +559,7 @@ static int binary_payload_to_msgpack(struct flb_log_event_encoder *encoder,

for (resource_logs_index = 0; resource_logs_index < input_logs->n_resource_logs; resource_logs_index++) {
resource_log = resource_logs[resource_logs_index];
resource = resource_log->resource;
scope_logs = resource_log->scope_logs;

if (resource_log->n_scope_logs > 0 && scope_logs == NULL) {
Expand All @@ -449,21 +586,18 @@ static int binary_payload_to_msgpack(struct flb_log_event_encoder *encoder,
}

if (ret == FLB_EVENT_ENCODER_SUCCESS) {
ret = otel_pack_kvarray(
&packer,
log_records[log_record_index]->attributes,
log_records[log_record_index]->n_attributes);

msgpack_sbuffer_clear(&meta_buffer);
ret = otel_pack_v1_metadata(&meta_packer, log_records[log_record_index], resource, scope_log->scope);
if (ret != 0) {
flb_error("[otel] Failed to convert log record attributes");
flb_error("[otel] Failed to convert log record");

ret = FLB_EVENT_ENCODER_ERROR_SERIALIZATION_FAILURE;
}
else {
ret = flb_log_event_encoder_set_metadata_from_raw_msgpack(
encoder,
buffer.data,
buffer.size);
meta_buffer.data,
meta_buffer.size);
}

msgpack_sbuffer_clear(&buffer);
Expand Down Expand Up @@ -511,6 +645,7 @@ static int binary_payload_to_msgpack(struct flb_log_event_encoder *encoder,

binary_payload_to_msgpack_end:
msgpack_sbuffer_destroy(&buffer);
msgpack_sbuffer_destroy(&meta_buffer);
if (input_logs) {
opentelemetry__proto__collector__logs__v1__export_logs_service_request__free_unpacked(
input_logs, NULL);
Expand Down