Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions plugins/in_udp/udp.c
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,11 @@ static struct flb_config_map config_map[] = {
0, FLB_TRUE, offsetof(struct flb_in_udp_config, buffer_size_str),
"Set the buffer size"
},
{
FLB_CONFIG_MAP_STR, "source_address_key", (char *) NULL,
0, FLB_TRUE, offsetof(struct flb_in_udp_config, source_address_key),
"Key where the source address will be injected"
},
/* EOF */
{0}
};
Expand Down
1 change: 1 addition & 0 deletions plugins/in_udp/udp.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ struct flb_in_udp_config {
char *port; /* Port */
flb_sds_t raw_separator; /* Unescaped string delimiterr */
flb_sds_t separator; /* String delimiter */
flb_sds_t source_address_key; /* Source IP address */
int collector_id; /* Listener collector id */
struct flb_downstream *downstream; /* Client manager */
struct udp_conn *dummy_conn; /* Datagram dummy connection */
Expand Down
128 changes: 125 additions & 3 deletions plugins/in_udp/udp_conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,83 @@ static inline void consume_bytes(char *buf, int bytes, int length)
memmove(buf, buf + bytes, length - bytes);
}

static int append_message_to_record_data(char **result_buffer,
size_t *result_size,
flb_sds_t message_key_name,
char *base_object_buffer,
size_t base_object_size,
char *message_buffer,
size_t message_size,
int message_type)
{
int result = FLB_MAP_NOT_MODIFIED;
char *modified_data_buffer;
int modified_data_size;
msgpack_object_kv *new_map_entries[1];
msgpack_object_kv message_entry;
*result_buffer = NULL;
*result_size = 0;
modified_data_buffer = NULL;

if (message_key_name != NULL) {
new_map_entries[0] = &message_entry;

message_entry.key.type = MSGPACK_OBJECT_STR;
message_entry.key.via.str.size = flb_sds_len(message_key_name);
message_entry.key.via.str.ptr = message_key_name;

if (message_type == MSGPACK_OBJECT_BIN) {
message_entry.val.type = MSGPACK_OBJECT_BIN;
message_entry.val.via.bin.size = message_size;
message_entry.val.via.bin.ptr = message_buffer;
}
else if (message_type == MSGPACK_OBJECT_STR) {
message_entry.val.type = MSGPACK_OBJECT_STR;
message_entry.val.via.str.size = message_size;
message_entry.val.via.str.ptr = message_buffer;
}
else {
result = FLB_MAP_EXPANSION_INVALID_VALUE_TYPE;
}

if (result == FLB_MAP_NOT_MODIFIED) {
result = flb_msgpack_expand_map(base_object_buffer,
base_object_size,
new_map_entries, 1,
&modified_data_buffer,
&modified_data_size);
if (result == 0) {
result = FLB_MAP_EXPAND_SUCCESS;
}
else {
result = FLB_MAP_EXPANSION_ERROR;
}
}
}

if (result == FLB_MAP_EXPAND_SUCCESS) {
*result_buffer = modified_data_buffer;
*result_size = modified_data_size;
}

return result;
}

static inline int process_pack(struct udp_conn *conn,
char *pack, size_t size)
{
int ret;
size_t off = 0;
msgpack_unpacked result;
msgpack_object entry;
msgpack_sbuffer sbuf;
msgpack_packer pck;
struct flb_in_udp_config *ctx;
char *appended_address_buffer;
size_t appended_address_size;
char *source_address;
int i;
int len;

ctx = conn->ctx;

Expand All @@ -50,23 +119,72 @@ static inline int process_pack(struct udp_conn *conn,
while (msgpack_unpack_next(&result, pack, size, &off) == MSGPACK_UNPACK_SUCCESS) {
entry = result.data;

appended_address_buffer = NULL;
source_address = NULL;

ret = flb_log_event_encoder_begin_record(ctx->log_encoder);

if (ret == FLB_EVENT_ENCODER_SUCCESS) {
ret = flb_log_event_encoder_set_current_timestamp(ctx->log_encoder);
}

if (ctx->source_address_key != NULL) {
source_address = flb_connection_get_remote_address(conn->connection);
}

if (ret == FLB_EVENT_ENCODER_SUCCESS) {
if (entry.type == MSGPACK_OBJECT_MAP) {
ret = flb_log_event_encoder_set_body_from_msgpack_object(
ctx->log_encoder, &entry);
if (source_address != NULL) {
msgpack_sbuffer_init(&sbuf);
msgpack_packer_init(&pck, &sbuf, msgpack_sbuffer_write);

len = entry.via.map.size;
msgpack_pack_map(&pck, len);

for (i=0; i<len; i++) {
msgpack_pack_object(&pck, entry.via.map.ptr[i].key);
msgpack_pack_object(&pck, entry.via.map.ptr[i].val);
}

ret = append_message_to_record_data(&appended_address_buffer,
&appended_address_size,
ctx->source_address_key,
sbuf.data,
sbuf.size,
source_address,
strlen(source_address),
MSGPACK_OBJECT_STR);
msgpack_sbuffer_destroy(&sbuf);
}

if (ret == FLB_MAP_EXPANSION_ERROR) {
flb_plg_debug(ctx->ins, "error expanding source_address : %d", ret);
}

if (appended_address_buffer != NULL) {
ret = flb_log_event_encoder_set_body_from_raw_msgpack(
ctx->log_encoder, appended_address_buffer, appended_address_size);
}
else {
ret = flb_log_event_encoder_set_body_from_msgpack_object(
ctx->log_encoder, &entry);
}
}
else if (entry.type == MSGPACK_OBJECT_ARRAY) {
ret = flb_log_event_encoder_append_body_values(
if (source_address != NULL) {
ret = flb_log_event_encoder_append_body_values(
ctx->log_encoder,
FLB_LOG_EVENT_CSTRING_VALUE("msg"),
FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&entry),
FLB_LOG_EVENT_CSTRING_VALUE(ctx->source_address_key),
FLB_LOG_EVENT_CSTRING_VALUE(source_address));
}
else {
ret = flb_log_event_encoder_append_body_values(
ctx->log_encoder,
FLB_LOG_EVENT_CSTRING_VALUE("msg"),
FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&entry));
}
}
else {
ret = FLB_EVENT_ENCODER_ERROR_INVALID_VALUE_TYPE;
Expand All @@ -76,6 +194,10 @@ static inline int process_pack(struct udp_conn *conn,
ret = flb_log_event_encoder_commit_record(ctx->log_encoder);
}

if (appended_address_buffer != NULL) {
flb_free(appended_address_buffer);
}

if (ret != FLB_EVENT_ENCODER_SUCCESS) {
break;
}
Expand Down
4 changes: 4 additions & 0 deletions plugins/in_udp/udp_conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@

#define FLB_IN_UDP_CHUNK "32768"

#define FLB_MAP_EXPAND_SUCCESS 0
#define FLB_MAP_NOT_MODIFIED -1
#define FLB_MAP_EXPANSION_ERROR -2
#define FLB_MAP_EXPANSION_INVALID_VALUE_TYPE -3

struct udp_conn_stream {
char *tag;
Expand Down
1 change: 1 addition & 0 deletions tests/runtime/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ if(FLB_OUT_LIB)
FLB_RT_TEST(FLB_IN_SPLUNK "in_splunk.c")
FLB_RT_TEST(FLB_IN_SYSLOG "in_syslog.c")
FLB_RT_TEST(FLB_IN_TAIL "in_tail.c")
FLB_RT_TEST(FLB_IN_UDP "in_udp.c")
FLB_RT_TEST(FLB_IN_TCP "in_tcp.c")
FLB_RT_TEST(FLB_IN_FORWARD "in_forward.c")
FLB_RT_TEST(FLB_IN_FLUENTBIT_METRICS "in_fluentbit_metrics.c")
Expand Down
Loading