Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
214 changes: 143 additions & 71 deletions plugins/out_loki/loki.c
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ void flb_loki_kv_destroy(struct flb_loki_kv *kv)
flb_free(kv);
}

int flb_loki_kv_append(struct flb_loki *ctx, char *key, char *val)
int flb_loki_kv_append(struct flb_loki *ctx, struct mk_list *list, char *key, char *val)
{
int ra_count = 0;
int k_len;
Expand Down Expand Up @@ -276,7 +276,7 @@ int flb_loki_kv_append(struct flb_loki *ctx, char *key, char *val)
return -1;
}
}
mk_list_add(&kv->_head, &ctx->labels_list);
mk_list_add(&kv->_head, list);

/* return the number of record accessor values */
return ra_count;
Expand All @@ -291,6 +291,13 @@ static void flb_loki_kv_exit(struct flb_loki *ctx)
mk_list_foreach_safe(head, tmp, &ctx->labels_list) {
kv = mk_list_entry(head, struct flb_loki_kv, _head);

/* unlink and destroy */
mk_list_del(&kv->_head);
flb_loki_kv_destroy(kv);
}
mk_list_foreach_safe(head, tmp, &ctx->structured_metadata_list) {
kv = mk_list_entry(head, struct flb_loki_kv, _head);

/* unlink and destroy */
mk_list_del(&kv->_head);
flb_loki_kv_destroy(kv);
Expand Down Expand Up @@ -337,25 +344,17 @@ static int pack_label_key(msgpack_packer *mp_pck, char *key, int key_len)
return 0;
}

static flb_sds_t pack_labels(struct flb_loki *ctx,
msgpack_packer *mp_pck,
char *tag, int tag_len,
msgpack_object *map)
static void pack_kv(struct flb_loki *ctx,
msgpack_packer *mp_pck,
char *tag, int tag_len,
msgpack_object *map,
struct flb_mp_map_header *mh,
struct mk_list *list)
{
int i;
flb_sds_t ra_val;
struct mk_list *head;
struct flb_ra_value *rval = NULL;
struct flb_loki_kv *kv;
msgpack_object k;
msgpack_object v;
struct flb_mp_map_header mh;


/* Initialize dynamic map header */
flb_mp_map_header_init(&mh, mp_pck);

mk_list_foreach(head, &ctx->labels_list) {
flb_sds_t ra_val;
mk_list_foreach(head, list) {
kv = mk_list_entry(head, struct flb_loki_kv, _head);

/* record accessor key/value pair */
Expand All @@ -369,7 +368,7 @@ static flb_sds_t pack_labels(struct flb_loki *ctx,
}
else {
/* Pack the key and value */
flb_mp_map_header_append(&mh);
flb_mp_map_header_append(mh);

/* We skip the first '$' character since it won't be valid in Loki */
pack_label_key(mp_pck, kv->key_normalized,
Expand All @@ -390,7 +389,7 @@ static flb_sds_t pack_labels(struct flb_loki *ctx,
* invalid or empty value, on that case the k/v is skipped.
*/
if (kv->val_type == FLB_LOKI_KV_STR) {
flb_mp_map_header_append(&mh);
flb_mp_map_header_append(mh);
msgpack_pack_str(mp_pck, flb_sds_len(kv->key));
msgpack_pack_str_body(mp_pck, kv->key, flb_sds_len(kv->key));
msgpack_pack_str(mp_pck, flb_sds_len(kv->str_val));
Expand All @@ -403,7 +402,7 @@ static flb_sds_t pack_labels(struct flb_loki *ctx,
flb_plg_debug(ctx->ins, "could not translate record accessor");
}
else {
flb_mp_map_header_append(&mh);
flb_mp_map_header_append(mh);
msgpack_pack_str(mp_pck, flb_sds_len(kv->key));
msgpack_pack_str_body(mp_pck, kv->key, flb_sds_len(kv->key));
msgpack_pack_str(mp_pck, flb_sds_len(ra_val));
Expand All @@ -415,6 +414,35 @@ static flb_sds_t pack_labels(struct flb_loki *ctx,
}
}
}
}

static flb_sds_t pack_structured_metadata(struct flb_loki *ctx,
msgpack_packer *mp_pck,
char *tag, int tag_len,
msgpack_object *map)
{
struct flb_mp_map_header mh;
/* Initialize dynamic map header */
flb_mp_map_header_init(&mh, mp_pck);
pack_kv(ctx, mp_pck, tag, tag_len, map, &mh, &ctx->structured_metadata_list);
flb_mp_map_header_end(&mh);
return 0;
}

static flb_sds_t pack_labels(struct flb_loki *ctx,
msgpack_packer *mp_pck,
char *tag, int tag_len,
msgpack_object *map)
{
int i;
struct flb_ra_value *rval = NULL;
msgpack_object k;
msgpack_object v;
struct flb_mp_map_header mh;

/* Initialize dynamic map header */
flb_mp_map_header_init(&mh, mp_pck);
pack_kv(ctx, mp_pck, tag, tag_len, map, &mh, &ctx->labels_list);

if (ctx->auto_kubernetes_labels == FLB_TRUE) {
rval = flb_ra_get_value_object(ctx->ra_k8s, *map);
Expand Down Expand Up @@ -490,7 +518,7 @@ static int create_label_map_entry(struct flb_loki *ctx,
printf("label_key=%s val_str=%s\n", label_key, val_str);
*/

ret = flb_loki_kv_append(ctx, label_key, val_str);
ret = flb_loki_kv_append(ctx, &ctx->labels_list, label_key, val_str);
flb_sds_destroy(label_key);
flb_sds_destroy(val_str);
if (ret == -1) {
Expand Down Expand Up @@ -686,68 +714,92 @@ static int load_label_map_path(struct flb_loki *ctx, flb_sds_t path, int *ra_use
return 0;
}

static int parse_labels(struct flb_loki *ctx)
static int parse_kv(struct flb_loki *ctx, struct mk_list *kv, struct mk_list *list, int *ra_used)
{
int ret;
int ra_used = 0;
char *p;
flb_sds_t key;
flb_sds_t val;
struct mk_list *head;
struct flb_slist_entry *entry;

flb_loki_kv_init(&ctx->labels_list);

if (ctx->labels) {
mk_list_foreach(head, ctx->labels) {
entry = mk_list_entry(head, struct flb_slist_entry, _head);

/* record accessor label key ? */
if (entry->str[0] == '$') {
ret = flb_loki_kv_append(ctx, entry->str, NULL);
if (ret == -1) {
return -1;
}
else if (ret > 0) {
ra_used++;
}
continue;
}
if (ctx == NULL || list == NULL || ra_used == NULL) {
return -1;
}

p = strchr(entry->str, '=');
if (!p) {
flb_plg_error(ctx->ins, "invalid key value pair on '%s'",
entry->str);
return -1;
}
mk_list_foreach(head, kv) {
entry = mk_list_entry(head, struct flb_slist_entry, _head);

key = flb_sds_create_size((p - entry->str) + 1);
flb_sds_cat(key, entry->str, p - entry->str);
val = flb_sds_create(p + 1);
if (!key) {
flb_plg_error(ctx->ins,
"invalid key value pair on '%s'",
entry->str);
/* record accessor label key ? */
if (entry->str[0] == '$') {
ret = flb_loki_kv_append(ctx, list, entry->str, NULL);
if (ret == -1) {
return -1;
}
if (!val || flb_sds_len(val) == 0) {
flb_plg_error(ctx->ins,
"invalid key value pair on '%s'",
entry->str);
flb_sds_destroy(key);
return -1;
else if (ret > 0) {
(*ra_used)++;
}
continue;
}

p = strchr(entry->str, '=');
if (!p) {
flb_plg_error(ctx->ins, "invalid key value pair on '%s'",
entry->str);
return -1;
}

ret = flb_loki_kv_append(ctx, key, val);
key = flb_sds_create_size((p - entry->str) + 1);
flb_sds_cat(key, entry->str, p - entry->str);
val = flb_sds_create(p + 1);
if (!key) {
flb_plg_error(ctx->ins,
"invalid key value pair on '%s'",
entry->str);
return -1;
}
if (!val || flb_sds_len(val) == 0) {
flb_plg_error(ctx->ins,
"invalid key value pair on '%s'",
entry->str);
flb_sds_destroy(key);
flb_sds_destroy(val);
return -1;
}
ret = flb_loki_kv_append(ctx, list, key, val);
flb_sds_destroy(key);
flb_sds_destroy(val);

if (ret == -1) {
return -1;
}
else if (ret > 0) {
ra_used++;
}
if (ret == -1) {
return -1;
}
else if (ret > 0) {
(*ra_used)++;
}
}
return 0;
}

static int parse_labels(struct flb_loki *ctx)
{
int ret;
int ra_used = 0;
struct mk_list *head;
struct flb_slist_entry *entry;

flb_loki_kv_init(&ctx->labels_list);
flb_loki_kv_init(&ctx->structured_metadata_list);

if (ctx->structured_metadata) {
ret = parse_kv(ctx, ctx->structured_metadata, &ctx->structured_metadata_list, &ra_used);
if (ret == -1) {
return -1;
}
}

if (ctx->labels) {
ret = parse_kv(ctx, ctx->labels, &ctx->labels_list, &ra_used);
if (ret == -1) {
return -1;
}
}

Expand All @@ -761,7 +813,7 @@ static int parse_labels(struct flb_loki *ctx)
return -1;
}

ret = flb_loki_kv_append(ctx, entry->str, NULL);
ret = flb_loki_kv_append(ctx, &ctx->labels_list, entry->str, NULL);
if (ret == -1) {
return -1;
}
Expand Down Expand Up @@ -918,6 +970,7 @@ static struct flb_loki *loki_config_create(struct flb_output_instance *ins,
}
ctx->ins = ins;
flb_loki_kv_init(&ctx->labels_list);
flb_loki_kv_init(&ctx->structured_metadata_list);

/* Register context with plugin instance */
flb_output_set_context(ins, ctx);
Expand Down Expand Up @@ -1431,6 +1484,13 @@ static flb_sds_t loki_compose_payload(struct flb_loki *ctx,
* }
* ]
* }
*
* As of Loki 3.0, log entries may optionally contain a third element which is a JSON
* object indicating structured metadata:
*
* "values": [
* [ "<unix epoch in nanoseconds>", "<log line>", {"trace_id": "0242ac120002"}]
* ]
*/

ret = flb_log_event_decoder_init(&log_decoder, (char *) data, bytes);
Expand Down Expand Up @@ -1479,11 +1539,14 @@ static flb_sds_t loki_compose_payload(struct flb_loki *ctx,
while ((ret = flb_log_event_decoder_next(
&log_decoder,
&log_event)) == FLB_EVENT_DECODER_SUCCESS) {
msgpack_pack_array(&mp_pck, 2);
msgpack_pack_array(&mp_pck, ctx->structured_metadata ? 3 : 2);

/* Append the timestamp */
pack_timestamp(&mp_pck, &log_event.timestamp);
pack_record(ctx, &mp_pck, log_event.body, dynamic_tenant_id);
if (ctx->structured_metadata) {
pack_structured_metadata(ctx, &mp_pck, tag, tag_len, NULL);
}
}
}
else {
Expand Down Expand Up @@ -1512,11 +1575,14 @@ static flb_sds_t loki_compose_payload(struct flb_loki *ctx,
msgpack_pack_str_body(&mp_pck, "values", 6);
msgpack_pack_array(&mp_pck, 1);

msgpack_pack_array(&mp_pck, 2);
msgpack_pack_array(&mp_pck, ctx->structured_metadata ? 3 : 2);

/* Append the timestamp */
pack_timestamp(&mp_pck, &log_event.timestamp);
pack_record(ctx, &mp_pck, log_event.body, dynamic_tenant_id);
if (ctx->structured_metadata) {
pack_structured_metadata(ctx, &mp_pck, tag, tag_len, log_event.body);
}
}
}

Expand Down Expand Up @@ -1808,6 +1874,12 @@ static struct flb_config_map config_map[] = {
"labels for API requests. If no value is set, the default label is 'job=fluent-bit'"
},

{
FLB_CONFIG_MAP_CLIST, "structured_metadata", NULL,
0, FLB_TRUE, offsetof(struct flb_loki, structured_metadata),
"optional structured metadata fields for API requests."
},

{
FLB_CONFIG_MAP_BOOL, "auto_kubernetes_labels", "false",
0, FLB_TRUE, offsetof(struct flb_loki, auto_kubernetes_labels),
Expand Down
12 changes: 7 additions & 5 deletions plugins/out_loki/loki.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ struct flb_loki {
/* Labels */
struct mk_list *labels;
struct mk_list *label_keys;
struct mk_list *structured_metadata;
struct mk_list *remove_keys;

flb_sds_t label_map_path;
Expand All @@ -86,11 +87,12 @@ struct flb_loki {
char *tcp_host;
int out_line_format;
int out_drop_single_key;
int ra_used; /* number of record accessor label keys */
struct flb_record_accessor *ra_k8s; /* kubernetes record accessor */
struct mk_list labels_list; /* list of flb_loki_kv nodes */
struct mk_list remove_keys_derived; /* remove_keys with label RAs */
struct flb_mp_accessor *remove_mpa; /* remove_keys multi-pattern accessor */
int ra_used; /* number of record accessor label keys */
struct flb_record_accessor *ra_k8s; /* kubernetes record accessor */
struct mk_list labels_list; /* list of flb_loki_kv nodes */
struct mk_list structured_metadata_list; /* list of flb_loki_kv nodes */
struct mk_list remove_keys_derived; /* remove_keys with label RAs */
struct flb_mp_accessor *remove_mpa; /* remove_keys multi-pattern accessor */
struct flb_record_accessor *ra_tenant_id_key; /* dynamic tenant id key */

struct cfl_list dynamic_tenant_list;
Expand Down