@@ -77,6 +77,7 @@ static int in_systemd_collect(struct flb_input_instance *ins,
7777 int ret_j ;
7878 int i ;
7979 int len ;
80+ int key_len ;
8081 int entries = 0 ;
8182 int skip_entries = 0 ;
8283 int rows = 0 ;
@@ -100,6 +101,15 @@ static int in_systemd_collect(struct flb_input_instance *ins,
100101 const void * data ;
101102 struct flb_systemd_config * ctx = in_context ;
102103 struct flb_time tm ;
104+ struct cfl_kvlist * kvlist = NULL ;
105+ struct cfl_variant * cfl_val = NULL ;
106+ struct cfl_list * head ;
107+ struct cfl_kvpair * kvpair = NULL ;
108+ struct cfl_variant * v = NULL ;
109+ struct cfl_array * array = NULL ;
110+ struct cfl_variant * tmp_val = NULL ;
111+ flb_sds_t list_key = NULL ;
112+ flb_sds_t search_key = NULL ;
103113
104114 /* Restricted by mem_buf_limit */
105115 if (flb_input_buf_paused (ins ) == FLB_TRUE ) {
@@ -200,6 +210,13 @@ static int in_systemd_collect(struct flb_input_instance *ins,
200210 ret = flb_log_event_encoder_set_timestamp (ctx -> log_encoder , & tm );
201211 }
202212
213+ /* create an empty kvlist as the labels */
214+ kvlist = cfl_kvlist_create ();
215+ if (!kvlist ) {
216+ flb_plg_error (ctx -> ins , "error allocating kvlist" );
217+ break ;
218+ }
219+
203220 /* Pack every field in the entry */
204221 entries = 0 ;
205222 skip_entries = 0 ;
@@ -218,11 +235,7 @@ static int in_systemd_collect(struct flb_input_instance *ins,
218235 }
219236
220237 len = (sep - key );
221-
222- if (ret == FLB_EVENT_ENCODER_SUCCESS ) {
223- ret = flb_log_event_encoder_append_body_string_length (
224- ctx -> log_encoder , len );
225- }
238+ key_len = len ;
226239
227240 if (ctx -> lowercase == FLB_TRUE ) {
228241 /*
@@ -238,31 +251,137 @@ static int in_systemd_collect(struct flb_input_instance *ins,
238251 for (i = 0 ; i < len ; i ++ ) {
239252 buf [i ] = tolower (key [i ]);
240253 }
241-
242- if (ret == FLB_EVENT_ENCODER_SUCCESS ) {
243- ret = flb_log_event_encoder_append_body_string_body (
244- ctx -> log_encoder , buf , len );
245- }
254+ list_key = flb_sds_create_len (buf , key_len );
246255 }
247256 else {
248- if (ret == FLB_EVENT_ENCODER_SUCCESS ) {
249- ret = flb_log_event_encoder_append_body_string_body (
250- ctx -> log_encoder , (char * ) key , len );
251- }
257+ list_key = flb_sds_create_len (key , key_len );
258+ }
259+
260+ if (!list_key ) {
261+ continue ;
252262 }
253263
264+ /* Check existence */
265+ cfl_val = NULL ;
266+ cfl_val = cfl_kvlist_fetch_s (kvlist , list_key , key_len );
267+
254268 val = sep + 1 ;
255269 len = length - (sep - key ) - 1 ;
256270
257- if (ret == FLB_EVENT_ENCODER_SUCCESS ) {
258- ret = flb_log_event_encoder_append_body_string (
259- ctx -> log_encoder , (char * ) val , len );
271+ /* Initialize variable for cfl_variant operations. */
272+ search_key = NULL ;
273+ tmp_val = NULL ;
274+
275+ /* Store cfl_kvlist format at first to detect duplicated keys */
276+ if (cfl_val ) {
277+ switch (cfl_val -> type ) {
278+ case CFL_VARIANT_STRING :
279+ tmp_val = cfl_variant_create_from_string (cfl_val -> data .as_string );
280+ if (!tmp_val ) {
281+ continue ;
282+ }
283+ break ;
284+ case CFL_VARIANT_ARRAY :
285+ /* Just a reference */
286+ tmp_val = cfl_val ;
287+ break ;
288+ default :
289+ /* nop */
290+ }
291+
292+ switch (tmp_val -> type ) {
293+ case CFL_VARIANT_STRING :
294+ search_key = flb_sds_create_len (list_key , key_len );
295+ if (search_key != NULL ) {
296+ cfl_kvlist_remove (kvlist , list_key );
297+ }
298+ flb_sds_destroy (search_key );
299+
300+ array = cfl_array_create (8 );
301+ if (!array ) {
302+ cfl_variant_destroy (tmp_val );
303+ continue ;
304+ }
305+ if (cfl_array_resizable (array , CFL_TRUE ) == -1 ) {
306+ cfl_array_destroy (array );
307+ cfl_variant_destroy (tmp_val );
308+ continue ;
309+ }
310+
311+ cfl_array_append_string_s (array ,
312+ tmp_val -> data .as_string ,
313+ strlen (tmp_val -> data .as_string ),
314+ CFL_FALSE );
315+ cfl_array_append_string_s (array , (char * )val , strlen (val ), CFL_TRUE );
316+ cfl_kvlist_insert_array_s (kvlist , list_key , key_len , array );
317+ cfl_variant_destroy (tmp_val );
318+ break ;
319+ case CFL_VARIANT_ARRAY :
320+ /* Just appending the newly arrived field(s) */
321+ array = tmp_val -> data .as_array ;
322+ cfl_array_append_string_s (array , (char * )val , strlen (val ), CFL_TRUE );
323+ break ;
324+ default :
325+ /* nop */
326+ }
327+ }
328+ else {
329+ cfl_kvlist_insert_string_s (kvlist , list_key , key_len ,
330+ (char * )val , strlen (val ), CFL_FALSE );
260331 }
332+ flb_sds_destroy (list_key );
261333
262334 entries ++ ;
263335 }
264336 rows ++ ;
265337
338+ /* Interpret cfl_kvlist as logs type of events later. */
339+ cfl_list_foreach (head , & kvlist -> list ) {
340+ kvpair = cfl_list_entry (head , struct cfl_kvpair , _head );
341+ if (ret == FLB_EVENT_ENCODER_SUCCESS ) {
342+ ret = flb_log_event_encoder_append_body_string_length (
343+ ctx -> log_encoder , cfl_sds_len (kvpair -> key ));
344+ }
345+
346+ if (ret == FLB_EVENT_ENCODER_SUCCESS ) {
347+ ret = flb_log_event_encoder_append_body_string_body (
348+ ctx -> log_encoder , kvpair -> key , cfl_sds_len (kvpair -> key ));
349+ }
350+
351+ v = kvpair -> val ;
352+ if (v -> type == CFL_VARIANT_STRING ) {
353+ if (ret == FLB_EVENT_ENCODER_SUCCESS ) {
354+ ret = flb_log_event_encoder_append_body_string (
355+ ctx -> log_encoder , v -> data .as_string , cfl_variant_size_get (v ));
356+ }
357+ }
358+ else if (v -> type == CFL_VARIANT_ARRAY ) {
359+ if (ret == FLB_EVENT_ENCODER_SUCCESS ) {
360+ ret = flb_log_event_encoder_body_begin_array (ctx -> log_encoder );
361+ }
362+
363+ array = v -> data .as_array ;
364+ for (i = 0 ; i < array -> entry_count ; i ++ ) {
365+ if (ret == FLB_EVENT_ENCODER_SUCCESS ) {
366+ if (array -> entries [i ]-> type != CFL_VARIANT_STRING ) {
367+ continue ;
368+ }
369+ ret = flb_log_event_encoder_append_body_string (
370+ ctx -> log_encoder , array -> entries [i ]-> data .as_string ,
371+ cfl_variant_size_get (array -> entries [i ]));
372+ }
373+ }
374+
375+ if (ret == FLB_EVENT_ENCODER_SUCCESS ) {
376+ ret = flb_log_event_encoder_body_commit_array (ctx -> log_encoder );
377+ }
378+ }
379+ }
380+
381+ if (kvlist ) {
382+ cfl_kvlist_destroy (kvlist );
383+ }
384+
266385 if (skip_entries > 0 ) {
267386 flb_plg_error (ctx -> ins , "Skip %d broken entries" , skip_entries );
268387 }
0 commit comments