Skip to content

Commit 9763d0b

Browse files
cosmo0920Leonardo Alminana
authored andcommitted
in_udp: Add a capability to inject source IP (#7673)
--------- Signed-off-by: Hiroshi Hatake <[email protected]>
1 parent 996d569 commit 9763d0b

File tree

6 files changed

+576
-3
lines changed

6 files changed

+576
-3
lines changed

plugins/in_udp/udp.c

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,11 @@ static struct flb_config_map config_map[] = {
174174
0, FLB_TRUE, offsetof(struct flb_in_udp_config, buffer_size_str),
175175
"Set the buffer size"
176176
},
177+
{
178+
FLB_CONFIG_MAP_STR, "source_address_key", (char *) NULL,
179+
0, FLB_TRUE, offsetof(struct flb_in_udp_config, source_address_key),
180+
"Key where the source address will be injected"
181+
},
177182
/* EOF */
178183
{0}
179184
};

plugins/in_udp/udp.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ struct flb_in_udp_config {
4343
char *port; /* Port */
4444
flb_sds_t raw_separator; /* Unescaped string delimiterr */
4545
flb_sds_t separator; /* String delimiter */
46+
flb_sds_t source_address_key; /* Source IP address */
4647
int collector_id; /* Listener collector id */
4748
struct flb_downstream *downstream; /* Client manager */
4849
struct udp_conn *dummy_conn; /* Datagram dummy connection */

plugins/in_udp/udp_conn.c

Lines changed: 125 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,14 +32,83 @@ static inline void consume_bytes(char *buf, int bytes, int length)
3232
memmove(buf, buf + bytes, length - bytes);
3333
}
3434

35+
static int append_message_to_record_data(char **result_buffer,
36+
size_t *result_size,
37+
flb_sds_t message_key_name,
38+
char *base_object_buffer,
39+
size_t base_object_size,
40+
char *message_buffer,
41+
size_t message_size,
42+
int message_type)
43+
{
44+
int result = FLB_MAP_NOT_MODIFIED;
45+
char *modified_data_buffer;
46+
int modified_data_size;
47+
msgpack_object_kv *new_map_entries[1];
48+
msgpack_object_kv message_entry;
49+
*result_buffer = NULL;
50+
*result_size = 0;
51+
modified_data_buffer = NULL;
52+
53+
if (message_key_name != NULL) {
54+
new_map_entries[0] = &message_entry;
55+
56+
message_entry.key.type = MSGPACK_OBJECT_STR;
57+
message_entry.key.via.str.size = flb_sds_len(message_key_name);
58+
message_entry.key.via.str.ptr = message_key_name;
59+
60+
if (message_type == MSGPACK_OBJECT_BIN) {
61+
message_entry.val.type = MSGPACK_OBJECT_BIN;
62+
message_entry.val.via.bin.size = message_size;
63+
message_entry.val.via.bin.ptr = message_buffer;
64+
}
65+
else if (message_type == MSGPACK_OBJECT_STR) {
66+
message_entry.val.type = MSGPACK_OBJECT_STR;
67+
message_entry.val.via.str.size = message_size;
68+
message_entry.val.via.str.ptr = message_buffer;
69+
}
70+
else {
71+
result = FLB_MAP_EXPANSION_INVALID_VALUE_TYPE;
72+
}
73+
74+
if (result == FLB_MAP_NOT_MODIFIED) {
75+
result = flb_msgpack_expand_map(base_object_buffer,
76+
base_object_size,
77+
new_map_entries, 1,
78+
&modified_data_buffer,
79+
&modified_data_size);
80+
if (result == 0) {
81+
result = FLB_MAP_EXPAND_SUCCESS;
82+
}
83+
else {
84+
result = FLB_MAP_EXPANSION_ERROR;
85+
}
86+
}
87+
}
88+
89+
if (result == FLB_MAP_EXPAND_SUCCESS) {
90+
*result_buffer = modified_data_buffer;
91+
*result_size = modified_data_size;
92+
}
93+
94+
return result;
95+
}
96+
3597
static inline int process_pack(struct udp_conn *conn,
3698
char *pack, size_t size)
3799
{
38100
int ret;
39101
size_t off = 0;
40102
msgpack_unpacked result;
41103
msgpack_object entry;
104+
msgpack_sbuffer sbuf;
105+
msgpack_packer pck;
42106
struct flb_in_udp_config *ctx;
107+
char *appended_address_buffer;
108+
size_t appended_address_size;
109+
char *source_address;
110+
int i;
111+
int len;
43112

44113
ctx = conn->ctx;
45114

@@ -50,23 +119,72 @@ static inline int process_pack(struct udp_conn *conn,
50119
while (msgpack_unpack_next(&result, pack, size, &off) == MSGPACK_UNPACK_SUCCESS) {
51120
entry = result.data;
52121

122+
appended_address_buffer = NULL;
123+
source_address = NULL;
53124

54125
ret = flb_log_event_encoder_begin_record(ctx->log_encoder);
55126

56127
if (ret == FLB_EVENT_ENCODER_SUCCESS) {
57128
ret = flb_log_event_encoder_set_current_timestamp(ctx->log_encoder);
58129
}
59130

131+
if (ctx->source_address_key != NULL) {
132+
source_address = flb_connection_get_remote_address(conn->connection);
133+
}
134+
60135
if (ret == FLB_EVENT_ENCODER_SUCCESS) {
61136
if (entry.type == MSGPACK_OBJECT_MAP) {
62-
ret = flb_log_event_encoder_set_body_from_msgpack_object(
63-
ctx->log_encoder, &entry);
137+
if (source_address != NULL) {
138+
msgpack_sbuffer_init(&sbuf);
139+
msgpack_packer_init(&pck, &sbuf, msgpack_sbuffer_write);
140+
141+
len = entry.via.map.size;
142+
msgpack_pack_map(&pck, len);
143+
144+
for (i=0; i<len; i++) {
145+
msgpack_pack_object(&pck, entry.via.map.ptr[i].key);
146+
msgpack_pack_object(&pck, entry.via.map.ptr[i].val);
147+
}
148+
149+
ret = append_message_to_record_data(&appended_address_buffer,
150+
&appended_address_size,
151+
ctx->source_address_key,
152+
sbuf.data,
153+
sbuf.size,
154+
source_address,
155+
strlen(source_address),
156+
MSGPACK_OBJECT_STR);
157+
msgpack_sbuffer_destroy(&sbuf);
158+
}
159+
160+
if (ret == FLB_MAP_EXPANSION_ERROR) {
161+
flb_plg_debug(ctx->ins, "error expanding source_address : %d", ret);
162+
}
163+
164+
if (appended_address_buffer != NULL) {
165+
ret = flb_log_event_encoder_set_body_from_raw_msgpack(
166+
ctx->log_encoder, appended_address_buffer, appended_address_size);
167+
}
168+
else {
169+
ret = flb_log_event_encoder_set_body_from_msgpack_object(
170+
ctx->log_encoder, &entry);
171+
}
64172
}
65173
else if (entry.type == MSGPACK_OBJECT_ARRAY) {
66-
ret = flb_log_event_encoder_append_body_values(
174+
if (source_address != NULL) {
175+
ret = flb_log_event_encoder_append_body_values(
176+
ctx->log_encoder,
177+
FLB_LOG_EVENT_CSTRING_VALUE("msg"),
178+
FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&entry),
179+
FLB_LOG_EVENT_CSTRING_VALUE(ctx->source_address_key),
180+
FLB_LOG_EVENT_CSTRING_VALUE(source_address));
181+
}
182+
else {
183+
ret = flb_log_event_encoder_append_body_values(
67184
ctx->log_encoder,
68185
FLB_LOG_EVENT_CSTRING_VALUE("msg"),
69186
FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&entry));
187+
}
70188
}
71189
else {
72190
ret = FLB_EVENT_ENCODER_ERROR_INVALID_VALUE_TYPE;
@@ -76,6 +194,10 @@ static inline int process_pack(struct udp_conn *conn,
76194
ret = flb_log_event_encoder_commit_record(ctx->log_encoder);
77195
}
78196

197+
if (appended_address_buffer != NULL) {
198+
flb_free(appended_address_buffer);
199+
}
200+
79201
if (ret != FLB_EVENT_ENCODER_SUCCESS) {
80202
break;
81203
}

plugins/in_udp/udp_conn.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,10 @@
2525

2626
#define FLB_IN_UDP_CHUNK "32768"
2727

28+
#define FLB_MAP_EXPAND_SUCCESS 0
29+
#define FLB_MAP_NOT_MODIFIED -1
30+
#define FLB_MAP_EXPANSION_ERROR -2
31+
#define FLB_MAP_EXPANSION_INVALID_VALUE_TYPE -3
2832

2933
struct udp_conn_stream {
3034
char *tag;

tests/runtime/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ if(FLB_OUT_LIB)
5252
FLB_RT_TEST(FLB_IN_SPLUNK "in_splunk.c")
5353
FLB_RT_TEST(FLB_IN_SYSLOG "in_syslog.c")
5454
FLB_RT_TEST(FLB_IN_TAIL "in_tail.c")
55+
FLB_RT_TEST(FLB_IN_UDP "in_udp.c")
5556
FLB_RT_TEST(FLB_IN_TCP "in_tcp.c")
5657
FLB_RT_TEST(FLB_IN_FORWARD "in_forward.c")
5758
FLB_RT_TEST(FLB_IN_FLUENTBIT_METRICS "in_fluentbit_metrics.c")

0 commit comments

Comments
 (0)