Skip to content
Merged
4 changes: 4 additions & 0 deletions cmake/msgpack.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,9 @@ option(MSGPACK_BUILD_EXAMPLES OFF)
include_directories(
${FLB_PATH_ROOT_SOURCE}/${FLB_PATH_LIB_MSGPACK}/include
)

# define preprocessor MSGPACK_EMBED_STACK_SIZE to 1024
add_compile_definitions(MSGPACK_EMBED_STACK_SIZE=64)

add_subdirectory(${FLB_PATH_LIB_MSGPACK} EXCLUDE_FROM_ALL)
set(MSGPACK_LIBRARIES "msgpack-c-static")
5 changes: 5 additions & 0 deletions plugins/in_opentelemetry/opentelemetry_logs.c
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,11 @@ static int otlp_pack_any_value(msgpack_packer *mp_pck,
result = otel_pack_bytes(mp_pck, body->bytes_value);
break;

case OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE__NOT_SET:
/* treat an unset value as null */
result = msgpack_pack_nil(mp_pck);
break;

default:
break;
}
Expand Down
5 changes: 4 additions & 1 deletion src/flb_pack.c
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ int flb_json_tokenise(const char *js, size_t len,
state->tokens = tmp;
state->tokens_size += new_tokens;

/* Reset parser to reprocess the JSON data from the beginning */
jsmn_init(&state->parser);

ret = jsmn_parse(&state->parser, js, len,
state->tokens, state->tokens_size);
}
Expand Down Expand Up @@ -217,7 +220,7 @@ static char *tokens_to_msgpack(struct flb_pack_state *state,
for (i = 0; i < arr_size ; i++) {
t = &tokens[i];

if (t->start == -1 || t->end == -1 || (t->start == 0 && t->end == 0)) {
if (t->start < 0 || t->end <= 0) {
msgpack_sbuffer_destroy(&sbuf);
return NULL;
}
Expand Down
45 changes: 33 additions & 12 deletions src/opentelemetry/flb_opentelemetry_utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,9 @@ int flb_otel_utils_json_payload_get_wrapped_value(msgpack_object *wrapper,
internal_type = MSGPACK_OBJECT_BIN;
}
else if (strncasecmp(kv_key->ptr, "arrayValue", kv_key->size) == 0) {
if (kv_value->type != MSGPACK_OBJECT_ARRAY) {
/* If the value is not an array, we cannot process it */
if (kv_value->type != MSGPACK_OBJECT_ARRAY &&
kv_value->type != MSGPACK_OBJECT_MAP) {
/* If the value is not an array or map, we cannot process it */
return -2;
}
internal_type = MSGPACK_OBJECT_ARRAY;
Expand Down Expand Up @@ -233,7 +234,6 @@ int flb_otel_utils_json_payload_append_converted_value(
target_field,
(char *) object->via.str.ptr,
object->via.str.size);

break;
case MSGPACK_OBJECT_NIL:
/* Append a null value */
Expand Down Expand Up @@ -262,7 +262,6 @@ int flb_otel_utils_json_payload_append_converted_value(
encoder,
target_field,
object);

break;
default:
break;
Expand Down Expand Up @@ -375,11 +374,8 @@ int flb_otel_utils_json_payload_append_converted_map(
object,
&encoder_result);

if (result == 0 && encoder_result == FLB_EVENT_ENCODER_SUCCESS) {
return result;
}
else if (result != 0) {
return result;
if (result == 0) {
return encoder_result;
}

result = flb_log_event_encoder_begin_map(encoder, target_field);
Expand Down Expand Up @@ -451,6 +447,9 @@ int flb_otel_utils_json_payload_append_converted_kvlist(
int value_index;
int key_index;
int result;
int pack_null_value = FLB_FALSE;
int pack_string_value = FLB_FALSE;
int pack_value = FLB_FALSE;
size_t index;
msgpack_object_array *array;
msgpack_object_map *entry;
Expand All @@ -476,15 +475,33 @@ int flb_otel_utils_json_payload_append_converted_kvlist(
result = FLB_EVENT_ENCODER_ERROR_INVALID_ARGUMENT;
}

value_index = -1;
pack_null_value = FLB_FALSE;
pack_string_value = FLB_FALSE;
pack_value = FLB_FALSE;

if (result == FLB_EVENT_ENCODER_SUCCESS) {
value_index = flb_otel_utils_find_map_entry_by_key(entry, "value", 0, FLB_TRUE);

if (value_index >= 0 &&
entry->ptr[value_index].val.type == MSGPACK_OBJECT_MAP &&
entry->ptr[value_index].val.via.map.size == 0) {
/*
* if value is an empty map it represents an unset value, pack as NULL
*/
pack_null_value = FLB_TRUE;
}
}

if (value_index == -1) {
/*
* if value is missing basically is 'unset' and handle as Empty() in OTel world, in
* this case we just pack an empty string value
*/
pack_string_value = FLB_TRUE;
}
else if (!pack_null_value) {
pack_value = FLB_TRUE;
}

if (result == FLB_EVENT_ENCODER_SUCCESS) {
Expand All @@ -495,14 +512,18 @@ int flb_otel_utils_json_payload_append_converted_kvlist(
}

if (result == FLB_EVENT_ENCODER_SUCCESS) {
/* if the value is not set, register an empty string as value */
if (value_index == -1) {
if (pack_null_value) {
/* pack NULL for unset values (empty maps) */
result = flb_log_event_encoder_append_null(encoder, target_field);
}
else if (pack_string_value) {
/* if the value is not set, register an empty string as value */
result = flb_log_event_encoder_append_string(
encoder,
target_field,
"", 0);
}
else {
else if (pack_value) {
/* expected value must come in a map */
if (entry->ptr[value_index].val.type != MSGPACK_OBJECT_MAP) {
result = -1;
Expand Down
97 changes: 97 additions & 0 deletions tests/internal/data/opentelemetry/test_cases.json
Original file line number Diff line number Diff line change
Expand Up @@ -1225,5 +1225,102 @@
"log_metadata": {"otlp":{}},
"log_body": {"test_key": "", "normal_key": "normal_value"}
}
},
"issue_10672_empty_map_as_null": {
"input": {
"resourceLogs": [{
"scopeLogs": [{
"logRecords": [{
"timeUnixNano": "1722904465173450100",
"body": {
"kvlistValue": {
"values": [
{"key": "empty_map_key", "value": {}},
{"key": "normal_key", "value": {"stringValue": "normal_value"}},
{"key": "missing_value_key"}
]
}
}
}]
}]
}]
},
"expected": {
"group_metadata": {"schema":"otlp","resource_id":0,"scope_id":0},
"group_body": {"resource":{}},
"log_metadata": {"otlp":{}},
"log_body": {"empty_map_key": null, "normal_key": "normal_value", "missing_value_key": ""}
}
},
"arrayvalue_wrapped_map": {
"input": {
"resourceLogs": [{
"scopeLogs": [{
"logRecords": [{
"timeUnixNano": "1640995200000000000",
"body": {
"kvlistValue": {
"values": [{
"key": "a",
"value": {
"kvlistValue": {
"values": [{
"key": "d",
"value": {
"arrayValue": {
"values": [{
"kvlistValue": {
"values": [{
"key": "e",
"value": {"stringValue": "g"}
}]
}
}]
}
}
}]
}
}
}]
}
}
}]
}]
}]
},
"expected": {
"group_metadata": {"schema":"otlp","resource_id":0,"scope_id":0},
"group_body": {"resource":{}},
"log_metadata": {"otlp":{}},
"log_body": {"a":{"d":[{"e":"g"}]}}
}
},
"unwrapped_plain_map": {
"input": {
"resourceLogs": [{
"scopeLogs": [{
"logRecords": [{
"timeUnixNano": "1640995200000000000",
"body": {
"kvlistValue": {
"values": [{
"key": "plain",
"value": {
"key1": {"stringValue": "v1"},
"key2": {"intValue": 2}
}
}]
}
}
}]
}]
}]
},
"expected": {
"group_metadata": {"schema":"otlp","resource_id":0,"scope_id":0},
"group_body": {"resource":{}},
"log_metadata": {"otlp":{}},
"log_body": {"plain": {"key1": "v1", "key2": 2}}
}
}
}
35 changes: 35 additions & 0 deletions tests/internal/pack.c
Original file line number Diff line number Diff line change
Expand Up @@ -970,6 +970,40 @@ void test_json_pack_bug5336()
}
}

/* Ensure empty arrays inside nested objects are handled */
void test_json_pack_empty_array()
{
int ret;
int root_type;
size_t out_size;
char *out_buf;
char *json = "{\"resourceLogs\":[{\"resource\":{},\"scopeLogs\":[{\"scope\":{},\"logRecords\":[{\"observedTimeUnixNano\":\"1754059282910920618\",\"body\":{\"kvlistValue\":{\"values\":[{\"key\":\"a\",\"value\":{\"kvlistValue\":{\"values\":[{\"key\":\"b\",\"value\":{\"kvlistValue\":{\"values\":[{\"key\":\"c\",\"value\":{\"kvlistValue\":{\"values\":[{\"key\":\"d\",\"value\":{\"arrayValue\":{\"values\":[{\"kvlistValue\":{\"values\":[{\"key\":\"e\",\"value\":{\"kvlistValue\":{\"values\":[{\"key\":\"f\",\"value\":{\"stringValue\":\"g\"}}]}}}]}}]}}}]}}}]}}}]}}}]}}}]}]}]}";

ret = flb_pack_json( (char*) json, strlen((char *) json), &out_buf, &out_size, &root_type, NULL);
TEST_CHECK(ret == 0);

if (ret != 0) {
printf("flb_pack_json failed: %d\n", ret);
exit(EXIT_FAILURE);
}

/* unpack just to validate the msgpack buffer */
msgpack_unpacked result;
msgpack_object obj;
size_t off = 0;
msgpack_unpacked_init(&result);
off = 0;
TEST_CHECK(msgpack_unpack_next(&result, out_buf, out_size, &off) == MSGPACK_UNPACK_SUCCESS);

printf("\nmsgpack---:\n");
msgpack_object_print(stdout, result.data);
printf("\n");

msgpack_unpacked_destroy(&result);

flb_free(out_buf);
}

const char input_msgpack[] = {0x92,/* array 2 */
0xd7, 0x00, /* event time*/
0x07, 0x5b, 0xcd, 0x15, /* second = 123456789 = 1973/11/29 21:33:09 */
Expand Down Expand Up @@ -1115,6 +1149,7 @@ TEST_LIST = {
{ "json_pack_bug1278" , test_json_pack_bug1278},
{ "json_pack_nan" , test_json_pack_nan},
{ "json_pack_bug5336" , test_json_pack_bug5336},
{ "json_pack_empty_array", test_json_pack_empty_array},
{ "json_date_iso8601" , test_json_date_iso8601},
{ "json_date_double" , test_json_date_double},
{ "json_date_java_sql" , test_json_date_java_sql},
Expand Down
Loading