diff --git a/plugins/out_loki/loki.c b/plugins/out_loki/loki.c index bcc89f37298..d7d9db524d4 100644 --- a/plugins/out_loki/loki.c +++ b/plugins/out_loki/loki.c @@ -183,7 +183,7 @@ void flb_loki_kv_destroy(struct flb_loki_kv *kv) flb_free(kv); } -int flb_loki_kv_append(struct flb_loki *ctx, char *key, char *val) +int flb_loki_kv_append(struct flb_loki *ctx, struct mk_list *list, char *key, char *val) { int ra_count = 0; int k_len; @@ -276,7 +276,7 @@ int flb_loki_kv_append(struct flb_loki *ctx, char *key, char *val) return -1; } } - mk_list_add(&kv->_head, &ctx->labels_list); + mk_list_add(&kv->_head, list); /* return the number of record accessor values */ return ra_count; @@ -291,6 +291,13 @@ static void flb_loki_kv_exit(struct flb_loki *ctx) mk_list_foreach_safe(head, tmp, &ctx->labels_list) { kv = mk_list_entry(head, struct flb_loki_kv, _head); + /* unlink and destroy */ + mk_list_del(&kv->_head); + flb_loki_kv_destroy(kv); + } + mk_list_foreach_safe(head, tmp, &ctx->structured_metadata_list) { + kv = mk_list_entry(head, struct flb_loki_kv, _head); + /* unlink and destroy */ mk_list_del(&kv->_head); flb_loki_kv_destroy(kv); @@ -337,25 +344,17 @@ static int pack_label_key(msgpack_packer *mp_pck, char *key, int key_len) return 0; } -static flb_sds_t pack_labels(struct flb_loki *ctx, - msgpack_packer *mp_pck, - char *tag, int tag_len, - msgpack_object *map) +static void pack_kv(struct flb_loki *ctx, + msgpack_packer *mp_pck, + char *tag, int tag_len, + msgpack_object *map, + struct flb_mp_map_header *mh, + struct mk_list *list) { - int i; - flb_sds_t ra_val; struct mk_list *head; - struct flb_ra_value *rval = NULL; struct flb_loki_kv *kv; - msgpack_object k; - msgpack_object v; - struct flb_mp_map_header mh; - - - /* Initialize dynamic map header */ - flb_mp_map_header_init(&mh, mp_pck); - - mk_list_foreach(head, &ctx->labels_list) { + flb_sds_t ra_val; + mk_list_foreach(head, list) { kv = mk_list_entry(head, struct flb_loki_kv, _head); /* record accessor key/value pair */ @@ -369,7 +368,7 @@ static flb_sds_t pack_labels(struct flb_loki *ctx, } else { /* Pack the key and value */ - flb_mp_map_header_append(&mh); + flb_mp_map_header_append(mh); /* We skip the first '$' character since it won't be valid in Loki */ pack_label_key(mp_pck, kv->key_normalized, @@ -390,7 +389,7 @@ static flb_sds_t pack_labels(struct flb_loki *ctx, * invalid or empty value, on that case the k/v is skipped. */ if (kv->val_type == FLB_LOKI_KV_STR) { - flb_mp_map_header_append(&mh); + flb_mp_map_header_append(mh); msgpack_pack_str(mp_pck, flb_sds_len(kv->key)); msgpack_pack_str_body(mp_pck, kv->key, flb_sds_len(kv->key)); msgpack_pack_str(mp_pck, flb_sds_len(kv->str_val)); @@ -403,7 +402,7 @@ static flb_sds_t pack_labels(struct flb_loki *ctx, flb_plg_debug(ctx->ins, "could not translate record accessor"); } else { - flb_mp_map_header_append(&mh); + flb_mp_map_header_append(mh); msgpack_pack_str(mp_pck, flb_sds_len(kv->key)); msgpack_pack_str_body(mp_pck, kv->key, flb_sds_len(kv->key)); msgpack_pack_str(mp_pck, flb_sds_len(ra_val)); @@ -415,6 +414,35 @@ static flb_sds_t pack_labels(struct flb_loki *ctx, } } } +} + +static flb_sds_t pack_structured_metadata(struct flb_loki *ctx, + msgpack_packer *mp_pck, + char *tag, int tag_len, + msgpack_object *map) +{ + struct flb_mp_map_header mh; + /* Initialize dynamic map header */ + flb_mp_map_header_init(&mh, mp_pck); + pack_kv(ctx, mp_pck, tag, tag_len, map, &mh, &ctx->structured_metadata_list); + flb_mp_map_header_end(&mh); + return 0; +} + +static flb_sds_t pack_labels(struct flb_loki *ctx, + msgpack_packer *mp_pck, + char *tag, int tag_len, + msgpack_object *map) +{ + int i; + struct flb_ra_value *rval = NULL; + msgpack_object k; + msgpack_object v; + struct flb_mp_map_header mh; + + /* Initialize dynamic map header */ + flb_mp_map_header_init(&mh, mp_pck); + pack_kv(ctx, mp_pck, tag, tag_len, map, &mh, &ctx->labels_list); if (ctx->auto_kubernetes_labels == FLB_TRUE) { rval = flb_ra_get_value_object(ctx->ra_k8s, *map); @@ -490,7 +518,7 @@ static int create_label_map_entry(struct flb_loki *ctx, printf("label_key=%s val_str=%s\n", label_key, val_str); */ - ret = flb_loki_kv_append(ctx, label_key, val_str); + ret = flb_loki_kv_append(ctx, &ctx->labels_list, label_key, val_str); flb_sds_destroy(label_key); flb_sds_destroy(val_str); if (ret == -1) { @@ -686,68 +714,92 @@ static int load_label_map_path(struct flb_loki *ctx, flb_sds_t path, int *ra_use return 0; } -static int parse_labels(struct flb_loki *ctx) +static int parse_kv(struct flb_loki *ctx, struct mk_list *kv, struct mk_list *list, int *ra_used) { int ret; - int ra_used = 0; char *p; flb_sds_t key; flb_sds_t val; struct mk_list *head; struct flb_slist_entry *entry; - flb_loki_kv_init(&ctx->labels_list); - - if (ctx->labels) { - mk_list_foreach(head, ctx->labels) { - entry = mk_list_entry(head, struct flb_slist_entry, _head); - - /* record accessor label key ? */ - if (entry->str[0] == '$') { - ret = flb_loki_kv_append(ctx, entry->str, NULL); - if (ret == -1) { - return -1; - } - else if (ret > 0) { - ra_used++; - } - continue; - } + if (ctx == NULL || list == NULL || ra_used == NULL) { + return -1; + } - p = strchr(entry->str, '='); - if (!p) { - flb_plg_error(ctx->ins, "invalid key value pair on '%s'", - entry->str); - return -1; - } + mk_list_foreach(head, kv) { + entry = mk_list_entry(head, struct flb_slist_entry, _head); - key = flb_sds_create_size((p - entry->str) + 1); - flb_sds_cat(key, entry->str, p - entry->str); - val = flb_sds_create(p + 1); - if (!key) { - flb_plg_error(ctx->ins, - "invalid key value pair on '%s'", - entry->str); + /* record accessor label key ? */ + if (entry->str[0] == '$') { + ret = flb_loki_kv_append(ctx, list, entry->str, NULL); + if (ret == -1) { return -1; } - if (!val || flb_sds_len(val) == 0) { - flb_plg_error(ctx->ins, - "invalid key value pair on '%s'", - entry->str); - flb_sds_destroy(key); - return -1; + else if (ret > 0) { + (*ra_used)++; } + continue; + } + + p = strchr(entry->str, '='); + if (!p) { + flb_plg_error(ctx->ins, "invalid key value pair on '%s'", + entry->str); + return -1; + } - ret = flb_loki_kv_append(ctx, key, val); + key = flb_sds_create_size((p - entry->str) + 1); + flb_sds_cat(key, entry->str, p - entry->str); + val = flb_sds_create(p + 1); + if (!key) { + flb_plg_error(ctx->ins, + "invalid key value pair on '%s'", + entry->str); + return -1; + } + if (!val || flb_sds_len(val) == 0) { + flb_plg_error(ctx->ins, + "invalid key value pair on '%s'", + entry->str); flb_sds_destroy(key); - flb_sds_destroy(val); + return -1; + } + ret = flb_loki_kv_append(ctx, list, key, val); + flb_sds_destroy(key); + flb_sds_destroy(val); - if (ret == -1) { - return -1; - } - else if (ret > 0) { - ra_used++; - } + if (ret == -1) { + return -1; + } + else if (ret > 0) { + (*ra_used)++; + } + } + return 0; +} + +static int parse_labels(struct flb_loki *ctx) +{ + int ret; + int ra_used = 0; + struct mk_list *head; + struct flb_slist_entry *entry; + + flb_loki_kv_init(&ctx->labels_list); + flb_loki_kv_init(&ctx->structured_metadata_list); + + if (ctx->structured_metadata) { + ret = parse_kv(ctx, ctx->structured_metadata, &ctx->structured_metadata_list, &ra_used); + if (ret == -1) { + return -1; + } + } + + if (ctx->labels) { + ret = parse_kv(ctx, ctx->labels, &ctx->labels_list, &ra_used); + if (ret == -1) { + return -1; } } @@ -761,7 +813,7 @@ static int parse_labels(struct flb_loki *ctx) return -1; } - ret = flb_loki_kv_append(ctx, entry->str, NULL); + ret = flb_loki_kv_append(ctx, &ctx->labels_list, entry->str, NULL); if (ret == -1) { return -1; } @@ -918,6 +970,7 @@ static struct flb_loki *loki_config_create(struct flb_output_instance *ins, } ctx->ins = ins; flb_loki_kv_init(&ctx->labels_list); + flb_loki_kv_init(&ctx->structured_metadata_list); /* Register context with plugin instance */ flb_output_set_context(ins, ctx); @@ -1431,6 +1484,13 @@ static flb_sds_t loki_compose_payload(struct flb_loki *ctx, * } * ] * } + * + * As of Loki 3.0, log entries may optionally contain a third element which is a JSON + * object indicating structured metadata: + * + * "values": [ + * [ "", "", {"trace_id": "0242ac120002"}] + * ] */ ret = flb_log_event_decoder_init(&log_decoder, (char *) data, bytes); @@ -1479,11 +1539,14 @@ static flb_sds_t loki_compose_payload(struct flb_loki *ctx, while ((ret = flb_log_event_decoder_next( &log_decoder, &log_event)) == FLB_EVENT_DECODER_SUCCESS) { - msgpack_pack_array(&mp_pck, 2); + msgpack_pack_array(&mp_pck, ctx->structured_metadata ? 3 : 2); /* Append the timestamp */ pack_timestamp(&mp_pck, &log_event.timestamp); pack_record(ctx, &mp_pck, log_event.body, dynamic_tenant_id); + if (ctx->structured_metadata) { + pack_structured_metadata(ctx, &mp_pck, tag, tag_len, NULL); + } } } else { @@ -1512,11 +1575,14 @@ static flb_sds_t loki_compose_payload(struct flb_loki *ctx, msgpack_pack_str_body(&mp_pck, "values", 6); msgpack_pack_array(&mp_pck, 1); - msgpack_pack_array(&mp_pck, 2); + msgpack_pack_array(&mp_pck, ctx->structured_metadata ? 3 : 2); /* Append the timestamp */ pack_timestamp(&mp_pck, &log_event.timestamp); pack_record(ctx, &mp_pck, log_event.body, dynamic_tenant_id); + if (ctx->structured_metadata) { + pack_structured_metadata(ctx, &mp_pck, tag, tag_len, log_event.body); + } } } @@ -1808,6 +1874,12 @@ static struct flb_config_map config_map[] = { "labels for API requests. If no value is set, the default label is 'job=fluent-bit'" }, + { + FLB_CONFIG_MAP_CLIST, "structured_metadata", NULL, + 0, FLB_TRUE, offsetof(struct flb_loki, structured_metadata), + "optional structured metadata fields for API requests." + }, + { FLB_CONFIG_MAP_BOOL, "auto_kubernetes_labels", "false", 0, FLB_TRUE, offsetof(struct flb_loki, auto_kubernetes_labels), diff --git a/plugins/out_loki/loki.h b/plugins/out_loki/loki.h index fa3eed14d3d..eab0fa907d6 100644 --- a/plugins/out_loki/loki.h +++ b/plugins/out_loki/loki.h @@ -77,6 +77,7 @@ struct flb_loki { /* Labels */ struct mk_list *labels; struct mk_list *label_keys; + struct mk_list *structured_metadata; struct mk_list *remove_keys; flb_sds_t label_map_path; @@ -86,11 +87,12 @@ struct flb_loki { char *tcp_host; int out_line_format; int out_drop_single_key; - int ra_used; /* number of record accessor label keys */ - struct flb_record_accessor *ra_k8s; /* kubernetes record accessor */ - struct mk_list labels_list; /* list of flb_loki_kv nodes */ - struct mk_list remove_keys_derived; /* remove_keys with label RAs */ - struct flb_mp_accessor *remove_mpa; /* remove_keys multi-pattern accessor */ + int ra_used; /* number of record accessor label keys */ + struct flb_record_accessor *ra_k8s; /* kubernetes record accessor */ + struct mk_list labels_list; /* list of flb_loki_kv nodes */ + struct mk_list structured_metadata_list; /* list of flb_loki_kv nodes */ + struct mk_list remove_keys_derived; /* remove_keys with label RAs */ + struct flb_mp_accessor *remove_mpa; /* remove_keys multi-pattern accessor */ struct flb_record_accessor *ra_tenant_id_key; /* dynamic tenant id key */ struct cfl_list dynamic_tenant_list;