Skip to content
Merged
39 changes: 39 additions & 0 deletions include/fluent-bit/flb_msgpack_append_message.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */

/* Fluent Bit
* ==========
* Copyright (C) 2015-2023 The Fluent Bit Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#ifndef FLB_MSGPACK_APPEND_MESSAGE_H
#define FLB_MSGPACK_APPEND_MESSAGE_H

/* Error codes */
#define FLB_MAP_EXPAND_SUCCESS 0
#define FLB_MAP_NOT_MODIFIED -1
#define FLB_MAP_EXPANSION_ERROR -2
#define FLB_MAP_EXPANSION_INVALID_VALUE_TYPE -3

#include <fluent-bit/flb_pack.h>

int flb_msgpack_append_message_to_record(char **result_buffer,
size_t *result_size,
flb_sds_t message_key_name,
char *base_object_buffer,
size_t base_object_size,
char *message_buffer,
size_t message_size,
int message_type);
#endif
5 changes: 5 additions & 0 deletions plugins/in_tcp/tcp.c
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,11 @@ static struct flb_config_map config_map[] = {
0, FLB_TRUE, offsetof(struct flb_in_tcp_config, buffer_size_str),
"Set the buffer size"
},
{
FLB_CONFIG_MAP_STR, "source_address_key", (char *) NULL,
0, FLB_TRUE, offsetof(struct flb_in_tcp_config, source_address_key),
"Key where the source address will be injected"
},
/* EOF */
{0}
};
Expand Down
1 change: 1 addition & 0 deletions plugins/in_tcp/tcp.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ struct flb_in_tcp_config {
char *tcp_port; /* TCP Port */
flb_sds_t raw_separator; /* Unescaped string delimiterr */
flb_sds_t separator; /* String delimiter */
flb_sds_t source_address_key; /* Source IP address */
int collector_id; /* Listener collector id */
struct flb_downstream *downstream; /* Client manager */
struct mk_list connections; /* List of active connections */
Expand Down
54 changes: 51 additions & 3 deletions plugins/in_tcp/tcp_conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <fluent-bit/flb_network.h>
#include <fluent-bit/flb_pack.h>
#include <fluent-bit/flb_error.h>
#include <fluent-bit/flb_msgpack_append_message.h>

#include "tcp.h"
#include "tcp_conn.h"
Expand All @@ -37,9 +38,13 @@ static inline int process_pack(struct tcp_conn *conn,
{
int ret;
size_t off = 0;
size_t prev_off = 0;
msgpack_unpacked result;
msgpack_object entry;
struct flb_in_tcp_config *ctx;
char *appended_address_buffer;
size_t appended_address_size;
char *source_address;

ctx = conn->ctx;

Expand All @@ -50,22 +55,60 @@ static inline int process_pack(struct tcp_conn *conn,
while (msgpack_unpack_next(&result, pack, size, &off) == MSGPACK_UNPACK_SUCCESS) {
entry = result.data;

appended_address_buffer = NULL;
source_address = NULL;

ret = flb_log_event_encoder_begin_record(ctx->log_encoder);

if (ret == FLB_EVENT_ENCODER_SUCCESS) {
ret = flb_log_event_encoder_set_current_timestamp(ctx->log_encoder);
}

if (ctx->source_address_key != NULL) {
source_address = flb_connection_get_remote_address(conn->connection);
}

if (ret == FLB_EVENT_ENCODER_SUCCESS) {
if (entry.type == MSGPACK_OBJECT_MAP) {
ret = flb_log_event_encoder_set_body_from_msgpack_object(
ctx->log_encoder, &entry);
if (ctx->source_address_key != NULL && source_address != NULL) {
ret = flb_msgpack_append_message_to_record(&appended_address_buffer,
&appended_address_size,
ctx->source_address_key,
pack + prev_off,
size,
source_address,
strlen(source_address),
MSGPACK_OBJECT_STR);
}

if (ret == FLB_MAP_EXPANSION_ERROR) {
flb_plg_debug(ctx->ins, "error expanding source_address : %d", ret);
}

if (appended_address_buffer != NULL) {
ret = flb_log_event_encoder_set_body_from_raw_msgpack(
ctx->log_encoder, appended_address_buffer, appended_address_size);
}
else {
ret = flb_log_event_encoder_set_body_from_msgpack_object(
ctx->log_encoder, &entry);
}
}
else if (entry.type == MSGPACK_OBJECT_ARRAY) {
ret = flb_log_event_encoder_append_body_values(
if (ctx->source_address_key != NULL && source_address != NULL) {
ret = flb_log_event_encoder_append_body_values(
ctx->log_encoder,
FLB_LOG_EVENT_CSTRING_VALUE("msg"),
FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&entry),
FLB_LOG_EVENT_CSTRING_VALUE(ctx->source_address_key),
FLB_LOG_EVENT_CSTRING_VALUE(source_address));
}
else {
ret = flb_log_event_encoder_append_body_values(
ctx->log_encoder,
FLB_LOG_EVENT_CSTRING_VALUE("msg"),
FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&entry));
}
}
else {
ret = FLB_EVENT_ENCODER_ERROR_INVALID_VALUE_TYPE;
Expand All @@ -75,10 +118,15 @@ static inline int process_pack(struct tcp_conn *conn,
ret = flb_log_event_encoder_commit_record(ctx->log_encoder);
}

if (appended_address_buffer != NULL) {
flb_free(appended_address_buffer);
}

if (ret != FLB_EVENT_ENCODER_SUCCESS) {
break;
}
}
prev_off = off;
}

msgpack_unpacked_destroy(&result);
Expand Down
1 change: 1 addition & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ set(src
flb_log_event_encoder_dynamic_field.c
flb_processor.c
flb_reload.c
flb_msgpack_append_message.c
)

# Config format
Expand Down
82 changes: 82 additions & 0 deletions src/flb_msgpack_append_message.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */

/* Fluent Bit
* ==========
* Copyright (C) 2015-2023 The Fluent Bit Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <fluent-bit/flb_msgpack_append_message.h>

int flb_msgpack_append_message_to_record(char **result_buffer,
size_t *result_size,
flb_sds_t message_key_name,
char *base_object_buffer,
size_t base_object_size,
char *message_buffer,
size_t message_size,
int message_type)
{
int result = FLB_MAP_NOT_MODIFIED;
char *modified_data_buffer;
int modified_data_size;
msgpack_object_kv *new_map_entries[1];
msgpack_object_kv message_entry;
*result_buffer = NULL;
*result_size = 0;
modified_data_buffer = NULL;

if (message_key_name != NULL) {
new_map_entries[0] = &message_entry;

message_entry.key.type = MSGPACK_OBJECT_STR;
message_entry.key.via.str.size = flb_sds_len(message_key_name);
message_entry.key.via.str.ptr = message_key_name;

if (message_type == MSGPACK_OBJECT_BIN) {
message_entry.val.type = MSGPACK_OBJECT_BIN;
message_entry.val.via.bin.size = message_size;
message_entry.val.via.bin.ptr = message_buffer;
}
else if (message_type == MSGPACK_OBJECT_STR) {
message_entry.val.type = MSGPACK_OBJECT_STR;
message_entry.val.via.str.size = message_size;
message_entry.val.via.str.ptr = message_buffer;
}
else {
result = FLB_MAP_EXPANSION_INVALID_VALUE_TYPE;
}

if (result == FLB_MAP_NOT_MODIFIED) {
result = flb_msgpack_expand_map(base_object_buffer,
base_object_size,
new_map_entries, 1,
&modified_data_buffer,
&modified_data_size);
if (result == 0) {
result = FLB_MAP_EXPAND_SUCCESS;
}
else {
result = FLB_MAP_EXPANSION_ERROR;
}
}
}

if (result == FLB_MAP_EXPAND_SUCCESS) {
*result_buffer = modified_data_buffer;
*result_size = modified_data_size;
}

return result;
}
1 change: 1 addition & 0 deletions tests/internal/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ set(UNIT_TESTS_FILES
log_event_decoder.c
processor.c
uri.c
msgpack_append_message.c
)

# Config format
Expand Down
5 changes: 5 additions & 0 deletions tests/internal/data/msgpack_append_message/map1.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{"key1": 123456789,
"key2": 0.999887766,
"key3": "abcdefghijklmnopqrstuvwxyz",
"key4": [{"a": 10, "b": 20}, {"c": 30, "d": 40}]
}
109 changes: 109 additions & 0 deletions tests/internal/msgpack_append_message.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */

#include <fluent-bit/flb_info.h>
#include <fluent-bit/flb_mem.h>
#include <fluent-bit/flb_pack.h>
#include <fluent-bit/flb_error.h>
#include <fluent-bit/flb_str.h>
#include <fluent-bit/flb_msgpack_append_message.h>
#include <monkey/mk_core.h>

#include <sys/types.h>
#include <sys/stat.h>

/* JSON tests data */
#define JSON_MAP1 FLB_TESTS_DATA_PATH "/data/msgpack_append_message/map1.json"

#include "flb_tests_internal.h"

struct msgpack_append_message_test {
char *msgpack;
char *json;
};

static inline int process_pack(char *pack, size_t size)
{
int ret;
msgpack_unpacked result;
char *appended_buffer = NULL;
size_t appended_size;
char *inject_message = "injected";
char *inject_key_name = "expanding";
flb_sds_t inject_key;
size_t off = 0;
size_t prev_off = 0;
flb_sds_t out_buf;
char *p = NULL;

inject_key = flb_sds_create_len(inject_key_name, strlen(inject_key_name));
if (!inject_key) {
flb_errno();
return -1;
}
msgpack_unpacked_init(&result);
while (msgpack_unpack_next(&result, pack, size, &off) == MSGPACK_UNPACK_SUCCESS) {
if (result.data.type == MSGPACK_OBJECT_MAP) {
ret = flb_msgpack_append_message_to_record(&appended_buffer,
&appended_size,
inject_key,
pack + prev_off,
size,
inject_message,
8,
MSGPACK_OBJECT_STR);
TEST_CHECK(ret == 0);

out_buf = flb_msgpack_raw_to_json_sds(appended_buffer, appended_size);
TEST_CHECK(out_buf != NULL);
p = strstr(out_buf, "\"expanding\":\"injected\"");
if (!TEST_CHECK(p != NULL)) {
TEST_MSG("\"expanding\":\"injected\" should be appended. out_buf=%s", out_buf);
}
if (out_buf) {
flb_sds_destroy(out_buf);
}
}
prev_off = off;
}

msgpack_unpacked_destroy(&result);

flb_sds_destroy(inject_key);
flb_free(appended_buffer);

return ret;
}

/* Append a single key-value pair into msgpack map */
void test_append_basic()
{
int ret;
size_t len;
char *data;
char *pack;
int out_size;
struct flb_pack_state state;

data = mk_file_to_buffer(JSON_MAP1);
TEST_CHECK(data != NULL);

len = strlen(data);

ret = flb_pack_state_init(&state);
TEST_CHECK(ret == 0);

ret = flb_pack_json_state(data, len, &pack, &out_size, &state);
TEST_CHECK(ret == 0);

ret = process_pack(pack, out_size);
TEST_CHECK(ret == 0);

flb_pack_state_reset(&state);
flb_free(data);
flb_free(pack);
}

TEST_LIST = {
{ "basic", test_append_basic },
{ 0 }
};
Loading