From 0f0e64f3dfcb6adc5d1da0e077dfa2e221d18c64 Mon Sep 17 00:00:00 2001 From: Takahiro Yamashita Date: Mon, 18 Dec 2023 19:56:20 +0900 Subject: [PATCH 1/2] in_opentelemetry: support to add resource of log Signed-off-by: Takahiro Yamashita --- plugins/in_opentelemetry/opentelemetry_prot.c | 38 +++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/plugins/in_opentelemetry/opentelemetry_prot.c b/plugins/in_opentelemetry/opentelemetry_prot.c index d73f82d87ce..d1e5377b514 100644 --- a/plugins/in_opentelemetry/opentelemetry_prot.c +++ b/plugins/in_opentelemetry/opentelemetry_prot.c @@ -24,6 +24,7 @@ #include #include #include +#include #include #include @@ -396,6 +397,7 @@ static int binary_payload_to_msgpack(struct flb_log_event_encoder *encoder, int resource_logs_index; int scope_log_index; int log_record_index; + struct flb_mp_map_header mh; Opentelemetry__Proto__Collector__Logs__V1__ExportLogsServiceRequest *input_logs; Opentelemetry__Proto__Logs__V1__ScopeLogs **scope_logs; @@ -403,6 +405,7 @@ static int binary_payload_to_msgpack(struct flb_log_event_encoder *encoder, Opentelemetry__Proto__Logs__V1__ResourceLogs **resource_logs; Opentelemetry__Proto__Logs__V1__ResourceLogs *resource_log; Opentelemetry__Proto__Logs__V1__LogRecord **log_records; + Opentelemetry__Proto__Resource__V1__Resource *resource; msgpack_sbuffer_init(&buffer); msgpack_packer_init(&packer, &buffer, msgpack_sbuffer_write); @@ -423,6 +426,7 @@ static int binary_payload_to_msgpack(struct flb_log_event_encoder *encoder, for (resource_logs_index = 0; resource_logs_index < input_logs->n_resource_logs; resource_logs_index++) { resource_log = resource_logs[resource_logs_index]; + resource = resource_log->resource; scope_logs = resource_log->scope_logs; if (resource_log->n_scope_logs > 0 && scope_logs == NULL) { @@ -449,6 +453,39 @@ static int binary_payload_to_msgpack(struct flb_log_event_encoder *encoder, } if (ret == FLB_EVENT_ENCODER_SUCCESS) { + flb_mp_map_header_init(&mh, &packer); + + /* pack resource */ + flb_mp_map_header_append(&mh); + msgpack_pack_str(&packer, 8); + msgpack_pack_str_body(&packer, "resource", 8); + if (resource != NULL) { + msgpack_pack_map(&packer, 1); + + msgpack_pack_str(&packer, 10); + msgpack_pack_str_body(&packer, "attributes", 10); + ret = otel_pack_kvarray( + &packer, + resource->attributes, + resource->n_attributes); + } + else { + msgpack_pack_map(&packer, 0); + } + + if (ret != 0) { + flb_error("[otel] Failed to convert log resource attributes"); + goto binary_payload_to_msgpack_end; + } + + /* pack logRecords */ + flb_mp_map_header_append(&mh); + msgpack_pack_str(&packer, 10); + msgpack_pack_str_body(&packer, "logRecords", 10); + + msgpack_pack_map(&packer, 1); + msgpack_pack_str(&packer, 10); + msgpack_pack_str_body(&packer, "attributes", 10); ret = otel_pack_kvarray( &packer, log_records[log_record_index]->attributes, @@ -460,6 +497,7 @@ static int binary_payload_to_msgpack(struct flb_log_event_encoder *encoder, ret = FLB_EVENT_ENCODER_ERROR_SERIALIZATION_FAILURE; } else { + flb_mp_map_header_end(&mh); ret = flb_log_event_encoder_set_metadata_from_raw_msgpack( encoder, buffer.data, From 48eae01cf3421de2129fa34f616545709eaffc12 Mon Sep 17 00:00:00 2001 From: Takahiro Yamashita Date: Sat, 20 Jan 2024 10:42:01 +0900 Subject: [PATCH 2/2] in_opentelemetry: add a function to store metadata This function is to store fields other than body as metadata. The fields are defined at https://opentelemetry.io/docs/specs/otel/logs/data-model/#definitions-used-in-this-document Signed-off-by: Takahiro Yamashita --- plugins/in_opentelemetry/opentelemetry_prot.c | 183 ++++++++++++++---- 1 file changed, 140 insertions(+), 43 deletions(-) diff --git a/plugins/in_opentelemetry/opentelemetry_prot.c b/plugins/in_opentelemetry/opentelemetry_prot.c index d1e5377b514..e9397c2ca48 100644 --- a/plugins/in_opentelemetry/opentelemetry_prot.c +++ b/plugins/in_opentelemetry/opentelemetry_prot.c @@ -387,6 +387,136 @@ static int otlp_pack_any_value(msgpack_packer *mp_pck, return result; } +/* https://opentelemetry.io/docs/specs/otel/logs/data-model/#log-and-event-record-definition */ +static int otel_pack_v1_metadata(msgpack_packer *mp_pck, + struct Opentelemetry__Proto__Logs__V1__LogRecord *log_record, + Opentelemetry__Proto__Resource__V1__Resource *resource, + Opentelemetry__Proto__Common__V1__InstrumentationScope *scope) +{ + struct flb_mp_map_header mh; + struct flb_mp_map_header scope_mh; + int ret; + flb_mp_map_header_init(&mh, mp_pck); + + flb_mp_map_header_append(&mh); + msgpack_pack_str(mp_pck, 17); + msgpack_pack_str_body(mp_pck, "ObservedTimestamp", 17); + msgpack_pack_uint64(mp_pck, log_record->observed_time_unix_nano); + + /* Value of 0 indicates unknown or missing timestamp. */ + if (log_record->time_unix_nano != 0) { + flb_mp_map_header_append(&mh); + msgpack_pack_str(mp_pck, 9); + msgpack_pack_str_body(mp_pck, "Timestamp", 9); + msgpack_pack_uint64(mp_pck, log_record->time_unix_nano); + } + + /* https://opentelemetry.io/docs/specs/otel/logs/data-model/#field-severitynumber */ + if (log_record->severity_number >= 1 && log_record->severity_number <= 24) { + flb_mp_map_header_append(&mh); + msgpack_pack_str(mp_pck, 14); + msgpack_pack_str_body(mp_pck, "SeverityNumber", 14); + msgpack_pack_uint64(mp_pck, log_record->severity_number); + } + + if (log_record->severity_text != NULL && strlen(log_record->severity_text) > 0) { + flb_mp_map_header_append(&mh); + msgpack_pack_str(mp_pck, 12); + msgpack_pack_str_body(mp_pck, "SeverityText", 12); + msgpack_pack_str(mp_pck, strlen(log_record->severity_text)); + msgpack_pack_str_body(mp_pck, log_record->severity_text, strlen(log_record->severity_text)); + } + + if (log_record->n_attributes > 0) { + flb_mp_map_header_append(&mh); + msgpack_pack_str(mp_pck, 10); + msgpack_pack_str_body(mp_pck, "Attributes", 10); + ret = otel_pack_kvarray(mp_pck, + log_record->attributes, + log_record->n_attributes); + if (ret != 0) { + return ret; + } + } + + if (log_record->trace_id.len > 0) { + flb_mp_map_header_append(&mh); + msgpack_pack_str(mp_pck, 7); + msgpack_pack_str_body(mp_pck, "TraceId", 7); + ret = otel_pack_bytes(mp_pck, log_record->trace_id); + if (ret != 0) { + return ret; + } + } + + if (log_record->span_id.len > 0) { + flb_mp_map_header_append(&mh); + msgpack_pack_str(mp_pck, 6); + msgpack_pack_str_body(mp_pck, "SpanId", 6); + ret = otel_pack_bytes(mp_pck, log_record->span_id); + if (ret != 0) { + return ret; + } + } + + flb_mp_map_header_append(&mh); + msgpack_pack_str(mp_pck, 10); + msgpack_pack_str_body(mp_pck, "TraceFlags", 10); + msgpack_pack_uint8(mp_pck, (uint8_t)log_record->flags & 0xff); + + + + if (resource != NULL && resource->n_attributes > 0 && resource->attributes) { + flb_mp_map_header_append(&mh); + msgpack_pack_str(mp_pck, 8); + msgpack_pack_str_body(mp_pck, "Resource", 8); + + ret = otel_pack_kvarray(mp_pck, + resource->attributes, + resource->n_attributes); + if (ret != 0) { + return ret; + } + } + + if (scope != NULL && (scope->name || scope->version || scope->n_attributes > 0)) { + flb_mp_map_header_append(&mh); + msgpack_pack_str(mp_pck, 20); + msgpack_pack_str_body(mp_pck, "InstrumentationScope", 20); + + flb_mp_map_header_init(&scope_mh, mp_pck); + if (scope->name != NULL && strlen(scope->name) > 0) { + flb_mp_map_header_append(&scope_mh); + msgpack_pack_str(mp_pck, 4); + msgpack_pack_str_body(mp_pck, "Name", 4); + msgpack_pack_str(mp_pck, strlen(scope->name)); + msgpack_pack_str_body(mp_pck, scope->name, strlen(scope->name)); + } + if (scope->version != NULL && strlen(scope->version) > 0) { + flb_mp_map_header_append(&scope_mh); + msgpack_pack_str(mp_pck, 7); + msgpack_pack_str_body(mp_pck, "Version", 7); + msgpack_pack_str(mp_pck, strlen(scope->version)); + msgpack_pack_str_body(mp_pck, scope->version, strlen(scope->version)); + } + if (scope->n_attributes > 0 && scope->attributes) { + msgpack_pack_str(mp_pck, 10); + msgpack_pack_str_body(mp_pck, "Attributes", 10); + ret = otel_pack_kvarray(mp_pck, + scope->attributes, + scope->n_attributes); + if (ret != 0) { + return ret; + } + } + + flb_mp_map_header_end(&scope_mh); + } + + flb_mp_map_header_end(&mh); + return 0; +} + static int binary_payload_to_msgpack(struct flb_log_event_encoder *encoder, uint8_t *in_buf, size_t in_size) @@ -394,10 +524,11 @@ static int binary_payload_to_msgpack(struct flb_log_event_encoder *encoder, int ret; msgpack_packer packer; msgpack_sbuffer buffer; + msgpack_packer meta_packer; + msgpack_sbuffer meta_buffer; int resource_logs_index; int scope_log_index; int log_record_index; - struct flb_mp_map_header mh; Opentelemetry__Proto__Collector__Logs__V1__ExportLogsServiceRequest *input_logs; Opentelemetry__Proto__Logs__V1__ScopeLogs **scope_logs; @@ -409,6 +540,8 @@ static int binary_payload_to_msgpack(struct flb_log_event_encoder *encoder, msgpack_sbuffer_init(&buffer); msgpack_packer_init(&packer, &buffer, msgpack_sbuffer_write); + msgpack_sbuffer_init(&meta_buffer); + msgpack_packer_init(&meta_packer, &meta_buffer, msgpack_sbuffer_write); input_logs = opentelemetry__proto__collector__logs__v1__export_logs_service_request__unpack(NULL, in_size, in_buf); if (input_logs == NULL) { @@ -453,55 +586,18 @@ static int binary_payload_to_msgpack(struct flb_log_event_encoder *encoder, } if (ret == FLB_EVENT_ENCODER_SUCCESS) { - flb_mp_map_header_init(&mh, &packer); - - /* pack resource */ - flb_mp_map_header_append(&mh); - msgpack_pack_str(&packer, 8); - msgpack_pack_str_body(&packer, "resource", 8); - if (resource != NULL) { - msgpack_pack_map(&packer, 1); - - msgpack_pack_str(&packer, 10); - msgpack_pack_str_body(&packer, "attributes", 10); - ret = otel_pack_kvarray( - &packer, - resource->attributes, - resource->n_attributes); - } - else { - msgpack_pack_map(&packer, 0); - } - - if (ret != 0) { - flb_error("[otel] Failed to convert log resource attributes"); - goto binary_payload_to_msgpack_end; - } - - /* pack logRecords */ - flb_mp_map_header_append(&mh); - msgpack_pack_str(&packer, 10); - msgpack_pack_str_body(&packer, "logRecords", 10); - - msgpack_pack_map(&packer, 1); - msgpack_pack_str(&packer, 10); - msgpack_pack_str_body(&packer, "attributes", 10); - ret = otel_pack_kvarray( - &packer, - log_records[log_record_index]->attributes, - log_records[log_record_index]->n_attributes); - + msgpack_sbuffer_clear(&meta_buffer); + ret = otel_pack_v1_metadata(&meta_packer, log_records[log_record_index], resource, scope_log->scope); if (ret != 0) { - flb_error("[otel] Failed to convert log record attributes"); + flb_error("[otel] Failed to convert log record"); ret = FLB_EVENT_ENCODER_ERROR_SERIALIZATION_FAILURE; } else { - flb_mp_map_header_end(&mh); ret = flb_log_event_encoder_set_metadata_from_raw_msgpack( encoder, - buffer.data, - buffer.size); + meta_buffer.data, + meta_buffer.size); } msgpack_sbuffer_clear(&buffer); @@ -549,6 +645,7 @@ static int binary_payload_to_msgpack(struct flb_log_event_encoder *encoder, binary_payload_to_msgpack_end: msgpack_sbuffer_destroy(&buffer); + msgpack_sbuffer_destroy(&meta_buffer); if (input_logs) { opentelemetry__proto__collector__logs__v1__export_logs_service_request__free_unpacked( input_logs, NULL);