Skip to content

Commit 518204b

Browse files
committed
in_systemd: Process enumerated data as cfl_kvlist(s) at first
This is because systemctl's -o json-pretty or -o json converts duplicated keys' values as array(s). To avoid generating the duplicated key(s) does not resolve this issue. Instead, we need to store as cfl_kvlist at first to detect duplicated keys on enumerated data in journal storage. Then, we also need to generate as msgpack's array format when the duplicated key(s) were detected and translated as array format when storing as cfl_kvlist(s). Signed-off-by: Hiroshi Hatake <[email protected]>
1 parent d873502 commit 518204b

File tree

1 file changed

+138
-17
lines changed

1 file changed

+138
-17
lines changed

plugins/in_systemd/systemd.c

Lines changed: 138 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ static int in_systemd_collect(struct flb_input_instance *ins,
7777
int ret_j;
7878
int i;
7979
int len;
80+
int key_len;
8081
int entries = 0;
8182
int skip_entries = 0;
8283
int rows = 0;
@@ -100,6 +101,15 @@ static int in_systemd_collect(struct flb_input_instance *ins,
100101
const void *data;
101102
struct flb_systemd_config *ctx = in_context;
102103
struct flb_time tm;
104+
struct cfl_kvlist *kvlist = NULL;
105+
struct cfl_variant *cfl_val = NULL;
106+
struct cfl_list *head;
107+
struct cfl_kvpair *kvpair = NULL;
108+
struct cfl_variant *v = NULL;
109+
struct cfl_array *array = NULL;
110+
struct cfl_variant *tmp_val = NULL;
111+
flb_sds_t list_key = NULL;
112+
flb_sds_t search_key = NULL;
103113

104114
/* Restricted by mem_buf_limit */
105115
if (flb_input_buf_paused(ins) == FLB_TRUE) {
@@ -200,6 +210,13 @@ static int in_systemd_collect(struct flb_input_instance *ins,
200210
ret = flb_log_event_encoder_set_timestamp(ctx->log_encoder, &tm);
201211
}
202212

213+
/* create an empty kvlist as the labels */
214+
kvlist = cfl_kvlist_create();
215+
if (!kvlist) {
216+
flb_plg_error(ctx->ins, "error allocating kvlist");
217+
break;
218+
}
219+
203220
/* Pack every field in the entry */
204221
entries = 0;
205222
skip_entries = 0;
@@ -218,11 +235,7 @@ static int in_systemd_collect(struct flb_input_instance *ins,
218235
}
219236

220237
len = (sep - key);
221-
222-
if (ret == FLB_EVENT_ENCODER_SUCCESS) {
223-
ret = flb_log_event_encoder_append_body_string_length(
224-
ctx->log_encoder, len);
225-
}
238+
key_len = len;
226239

227240
if (ctx->lowercase == FLB_TRUE) {
228241
/*
@@ -238,31 +251,139 @@ static int in_systemd_collect(struct flb_input_instance *ins,
238251
for (i = 0; i < len; i++) {
239252
buf[i] = tolower(key[i]);
240253
}
241-
242-
if (ret == FLB_EVENT_ENCODER_SUCCESS) {
243-
ret = flb_log_event_encoder_append_body_string_body(
244-
ctx->log_encoder, buf, len);
245-
}
254+
list_key = flb_sds_create_len(buf, key_len);
246255
}
247256
else {
248-
if (ret == FLB_EVENT_ENCODER_SUCCESS) {
249-
ret = flb_log_event_encoder_append_body_string_body(
250-
ctx->log_encoder, (char *) key, len);
251-
}
257+
list_key = flb_sds_create_len(key, key_len);
258+
}
259+
260+
if (!list_key) {
261+
continue;
252262
}
253263

264+
/* Check existence */
265+
cfl_val = NULL;
266+
cfl_val = cfl_kvlist_fetch_s(kvlist, list_key, key_len);
267+
254268
val = sep + 1;
255269
len = length - (sep - key) - 1;
256270

257-
if (ret == FLB_EVENT_ENCODER_SUCCESS) {
258-
ret = flb_log_event_encoder_append_body_string(
259-
ctx->log_encoder, (char *) val, len);
271+
/* Initialize variable for cfl_variant operations. */
272+
search_key = NULL;
273+
tmp_val = NULL;
274+
275+
/* Store cfl_kvlist format at first to detect duplicated keys */
276+
if (cfl_val) {
277+
switch(cfl_val->type) {
278+
case CFL_VARIANT_STRING:
279+
tmp_val = cfl_variant_create_from_string(cfl_val->data.as_string);
280+
if (!tmp_val) {
281+
continue;
282+
}
283+
break;
284+
case CFL_VARIANT_ARRAY:
285+
/* Just a reference */
286+
tmp_val = cfl_val;
287+
break;
288+
default:
289+
/* nop */
290+
break;
291+
}
292+
293+
switch(tmp_val->type) {
294+
case CFL_VARIANT_STRING:
295+
search_key = flb_sds_create_len(list_key, key_len);
296+
if (search_key != NULL) {
297+
cfl_kvlist_remove(kvlist, list_key);
298+
}
299+
flb_sds_destroy(search_key);
300+
301+
array = cfl_array_create(8);
302+
if (!array) {
303+
cfl_variant_destroy(tmp_val);
304+
continue;
305+
}
306+
if (cfl_array_resizable(array, CFL_TRUE) == -1) {
307+
cfl_array_destroy(array);
308+
cfl_variant_destroy(tmp_val);
309+
continue;
310+
}
311+
312+
cfl_array_append_string_s(array,
313+
tmp_val->data.as_string,
314+
strlen(tmp_val->data.as_string),
315+
CFL_FALSE);
316+
cfl_array_append_string_s(array, (char *)val, strlen(val), CFL_FALSE);
317+
cfl_kvlist_insert_array_s(kvlist, list_key, key_len, array);
318+
cfl_variant_destroy(tmp_val);
319+
break;
320+
case CFL_VARIANT_ARRAY:
321+
/* Just appending the newly arrived field(s) */
322+
array = tmp_val->data.as_array;
323+
cfl_array_append_string_s(array, (char *)val, strlen(val), CFL_FALSE);
324+
break;
325+
default:
326+
/* nop */
327+
break;
328+
}
329+
}
330+
else {
331+
cfl_kvlist_insert_string_s(kvlist, list_key, key_len,
332+
(char *)val, strlen(val), CFL_FALSE);
260333
}
334+
flb_sds_destroy(list_key);
261335

262336
entries++;
263337
}
264338
rows++;
265339

340+
/* Interpret cfl_kvlist as logs type of events later. */
341+
cfl_list_foreach(head, &kvlist->list) {
342+
kvpair = cfl_list_entry(head, struct cfl_kvpair, _head);
343+
if (ret == FLB_EVENT_ENCODER_SUCCESS) {
344+
ret = flb_log_event_encoder_append_body_string_length(
345+
ctx->log_encoder, cfl_sds_len(kvpair->key));
346+
}
347+
348+
if (ret == FLB_EVENT_ENCODER_SUCCESS) {
349+
ret = flb_log_event_encoder_append_body_string_body(
350+
ctx->log_encoder, kvpair->key, cfl_sds_len(kvpair->key));
351+
}
352+
353+
v = kvpair->val;
354+
if (v->type == CFL_VARIANT_STRING) {
355+
if (ret == FLB_EVENT_ENCODER_SUCCESS) {
356+
ret = flb_log_event_encoder_append_body_string(
357+
ctx->log_encoder, v->data.as_string, cfl_variant_size_get(v));
358+
}
359+
}
360+
else if (v->type == CFL_VARIANT_ARRAY) {
361+
if (ret == FLB_EVENT_ENCODER_SUCCESS) {
362+
ret = flb_log_event_encoder_body_begin_array(ctx->log_encoder);
363+
}
364+
365+
array = v->data.as_array;
366+
for (i = 0; i < array->entry_count; i++) {
367+
if (ret == FLB_EVENT_ENCODER_SUCCESS) {
368+
if (array->entries[i]->type != CFL_VARIANT_STRING) {
369+
continue;
370+
}
371+
ret = flb_log_event_encoder_append_body_string(
372+
ctx->log_encoder, array->entries[i]->data.as_string,
373+
cfl_variant_size_get(array->entries[i]));
374+
}
375+
}
376+
377+
if (ret == FLB_EVENT_ENCODER_SUCCESS) {
378+
ret = flb_log_event_encoder_body_commit_array(ctx->log_encoder);
379+
}
380+
}
381+
}
382+
383+
if (kvlist) {
384+
cfl_kvlist_destroy(kvlist);
385+
}
386+
266387
if (skip_entries > 0) {
267388
flb_plg_error(ctx->ins, "Skip %d broken entries", skip_entries);
268389
}

0 commit comments

Comments
 (0)