Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 29 additions & 2 deletions src/flb_metrics_exporter.c
Original file line number Diff line number Diff line change
Expand Up @@ -266,9 +266,10 @@ int flb_me_destroy(struct flb_me *me)
struct cmt *flb_me_get_cmetrics(struct flb_config *ctx)
{
int ret;
struct mk_list *head;
struct mk_list *head, *processor_head;
struct flb_input_instance *i; /* inputs */
struct flb_filter_instance *f; /* filter */
struct flb_processor_unit *pu; /* processors */
struct flb_filter_instance *f, *pf; /* filter */
struct flb_output_instance *o; /* output */
struct cmt *cmt;

Expand Down Expand Up @@ -308,6 +309,19 @@ struct cmt *flb_me_get_cmetrics(struct flb_config *ctx)
cmt_destroy(cmt);
return NULL;
}

mk_list_foreach(processor_head, &i->processor->logs) {
pu = mk_list_entry(processor_head, struct flb_processor_unit, _head);
if (pu->unit_type == FLB_PROCESSOR_UNIT_FILTER) {
pf = (struct flb_filter_instance *) pu->ctx;
ret = cmt_cat(cmt, pf->cmt);
if (ret == -1) {
flb_error("[metrics exporter] could not append metrics from %s", flb_filter_name(pf));
cmt_destroy(cmt);
return NULL;
}
}
}
}

mk_list_foreach(head, &ctx->filters) {
Expand All @@ -330,6 +344,19 @@ struct cmt *flb_me_get_cmetrics(struct flb_config *ctx)
cmt_destroy(cmt);
return NULL;
}

mk_list_foreach(processor_head, &o->processor->logs) {
pu = mk_list_entry(processor_head, struct flb_processor_unit, _head);
if (pu->unit_type == FLB_PROCESSOR_UNIT_FILTER) {
pf = (struct flb_filter_instance *) pu->ctx;
ret = cmt_cat(cmt, pf->cmt);
if (ret == -1) {
flb_error("[metrics exporter] could not append metrics from %s", flb_filter_name(pf));
cmt_destroy(cmt);
return NULL;
}
}
}
}

return cmt;
Expand Down
59 changes: 58 additions & 1 deletion src/flb_processor.c
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,13 @@ int flb_processor_run(struct flb_processor *proc,
struct flb_filter_instance *f_ins;
struct flb_processor_instance *p_ins;
struct flb_mp_chunk_cobj *chunk_cobj = NULL;
#ifdef FLB_HAVE_METRICS
int in_records = 0;
int out_records = 0;
int diff = 0;
uint64_t ts;
char *name;
#endif

if (type == FLB_PROCESSOR_LOGS) {
list = &proc->logs;
Expand All @@ -453,6 +460,11 @@ int flb_processor_run(struct flb_processor *proc,
list = &proc->traces;
}

#ifdef FLB_HAVE_METRICS
/* timestamp */
ts = cfl_time_now();
#endif

/* set current data buffer */
cur_buf = data;
cur_size = data_size;
Expand Down Expand Up @@ -493,7 +505,17 @@ int flb_processor_run(struct flb_processor *proc,
proc->data, /* (input/output) instance context */
f_ins->context, /* filter context */
proc->config);

#ifdef FLB_HAVE_METRICS
name = (char *) (flb_filter_name(f_ins));
in_records = flb_mp_count(cur_buf, cur_size);
cmt_counter_add(f_ins->cmt_records, ts, in_records,
1, (char *[]) {name});
cmt_counter_add(f_ins->cmt_bytes, ts, tmp_size,
1, (char *[]) {name});

flb_metrics_sum(FLB_METRIC_N_RECORDS, in_records, f_ins->metrics);
flb_metrics_sum(FLB_METRIC_N_BYTES, tmp_size, f_ins->metrics);
#endif
/*
* The cb_filter() function return status tells us if something changed
* during it process. The possible values are:
Expand All @@ -520,6 +542,15 @@ int flb_processor_run(struct flb_processor *proc,
*out_buf = NULL;
*out_size = 0;

#ifdef FLB_HAVE_METRICS
/* cmetrics */
cmt_counter_add(f_ins->cmt_drop_records, ts, in_records,
1, (char *[]) {name});

/* [OLD] Summarize all records removed */
flb_metrics_sum(FLB_METRIC_N_DROPPED,
in_records, f_ins->metrics);
#endif
release_lock(&pu->lock,
FLB_PROCESSOR_LOCK_RETRY_LIMIT,
FLB_PROCESSOR_LOCK_RETRY_DELAY);
Expand All @@ -530,6 +561,32 @@ int flb_processor_run(struct flb_processor *proc,
/* set new buffer */
cur_buf = tmp_buf;
cur_size = tmp_size;
out_records = flb_mp_count(tmp_buf, tmp_size);
#ifdef FLB_HAVE_METRICS
if (out_records > in_records) {
diff = (out_records - in_records);

/* cmetrics */
cmt_counter_add(f_ins->cmt_add_records, ts, diff,
1, (char *[]) {name});

/* [OLD] Summarize new records */
flb_metrics_sum(FLB_METRIC_N_ADDED,
diff, f_ins->metrics);
}
else if (out_records < in_records) {
diff = (in_records - out_records);

/* cmetrics */
cmt_counter_add(f_ins->cmt_drop_records, ts, diff,
1, (char *[]) {name});

/* [OLD] Summarize dropped records */
flb_metrics_sum(FLB_METRIC_N_DROPPED,
diff, f_ins->metrics);
}
#endif

}
else if (ret == FLB_FILTER_NOTOUCH) {
/* keep original data, do nothing */
Expand Down