Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions plugins/processor_content_modifier/cm.c
Original file line number Diff line number Diff line change
Expand Up @@ -100,15 +100,15 @@ static int cb_process_traces(struct flb_processor_instance *ins,

static struct flb_config_map config_map[] = {
{
FLB_CONFIG_MAP_STR, "action", NULL,
0, FLB_TRUE, offsetof(struct content_modifier_ctx, action_str),
"Action to perform over the content: insert, upsert, delete, rename or hash."
FLB_CONFIG_MAP_STR, "context", NULL,
0, FLB_TRUE, offsetof(struct content_modifier_ctx, context_str),
"Context where the action will be applied."
},

{
FLB_CONFIG_MAP_STR, "context", NULL,
0, FLB_TRUE, offsetof(struct content_modifier_ctx, context_str),
"Context to apply the action."
FLB_CONFIG_MAP_STR, "action", NULL,
0, FLB_TRUE, offsetof(struct content_modifier_ctx, action_str),
"Action to perform over the content: insert, upsert, delete, rename or hash."
},

{
Expand Down
5 changes: 5 additions & 0 deletions plugins/processor_content_modifier/cm.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ enum {
CM_CONTEXT_LOG_METADATA,
CM_CONTEXT_LOG_BODY,

CM_CONTEXT_OTEL_RESOURCE_ATTR,
CM_CONTEXT_OTEL_SCOPE_NAME,
CM_CONTEXT_OTEL_SCOPE_VERSION,
CM_CONTEXT_OTEL_SCOPE_ATTR,

/* Metrics */
CM_CONTEXT_METRIC_NAME,
CM_CONTEXT_METRIC_DESCRIPTION,
Expand Down
88 changes: 87 additions & 1 deletion plugins/processor_content_modifier/cm_config.c
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,92 @@ static int set_context(struct content_modifier_ctx *ctx)
strcasecmp(ctx->context_str, "record") == 0) {
context = CM_CONTEXT_LOG_BODY;
}
/*
* OpenTelemetry contexts
* ----------------------
*/
else if (strcasecmp(ctx->context_str, "otel_resource_attributes") == 0) {
context = CM_CONTEXT_OTEL_RESOURCE_ATTR;
}
else if (strcasecmp(ctx->context_str, "otel_scope_name") == 0) {
/*
* scope name is restricted to specific actions, make sure the user
* cannot messed it up
*
* action allowed ?
* -----------------------------
* CM_ACTION_INSERT Yes
* CM_ACTION_UPSERT Yes
* CM_ACTION_DELETE Yes
* CM_ACTION_RENAME No
* CM_ACTION_HASH Yes
* CM_ACTION_EXTRACT No
* CM_ACTION_CONVERT No
*/

if (ctx->action_type == CM_ACTION_RENAME ||
ctx->action_type == CM_ACTION_EXTRACT ||
ctx->action_type == CM_ACTION_CONVERT) {
flb_plg_error(ctx->ins, "action '%s' is not allowed for context '%s'",
ctx->action_str, ctx->context_str);
return -1;
}

/* check that 'name' is the key set */
if (!ctx->key) {
ctx->key = flb_sds_create("name");
}
else if (strcasecmp(ctx->key, "name") != 0) {
flb_plg_error(ctx->ins, "context '%s' requires the name of the key to be 'name', no '%s'",
ctx->context_str, ctx->key);
return -1;
}

context = CM_CONTEXT_OTEL_SCOPE_NAME;
}
else if (strcasecmp(ctx->context_str, "otel_scope_version") == 0) {
/*
* scope version, same as the name, it's restricted to specific actions, make sure the user
* cannot messed it up
*
* action allowed ?
* -----------------------------
* CM_ACTION_INSERT Yes
* CM_ACTION_UPSERT Yes
* CM_ACTION_DELETE Yes
* CM_ACTION_RENAME No
* CM_ACTION_HASH Yes
* CM_ACTION_EXTRACT No
* CM_ACTION_CONVERT No
*/

if (ctx->action_type == CM_ACTION_RENAME ||
ctx->action_type == CM_ACTION_EXTRACT ||
ctx->action_type == CM_ACTION_CONVERT) {
flb_plg_error(ctx->ins, "action '%s' is not allowed for context '%s'",
ctx->action_str, ctx->context_str);
return -1;
}

/* check that 'version' is the key set */
if (!ctx->key) {
ctx->key = flb_sds_create("version");
}
else if (strcasecmp(ctx->key, "version") != 0) {
flb_plg_error(ctx->ins, "context '%s' requires the name of the key to be 'version', no '%s'",
ctx->context_str, ctx->key);
return -1;
}
context = CM_CONTEXT_OTEL_SCOPE_VERSION;
}
else if (strcasecmp(ctx->context_str, "otel_scope_attributes") == 0) {
context = CM_CONTEXT_OTEL_SCOPE_ATTR;
}
else if (strcasecmp(ctx->context_str, "otel_scope_name") == 0) {
}
else if (strcasecmp(ctx->context_str, "otel_scope_version") == 0) {
context = CM_CONTEXT_OTEL_SCOPE_VERSION;
}
else {
flb_plg_error(ctx->ins, "unknown logs context '%s'", ctx->context_str);
return -1;
Expand Down Expand Up @@ -177,7 +263,7 @@ static int check_action_requirements(struct content_modifier_ctx *ctx)
/* these only requires a key, already validated (useless code) */
}
else if (ctx->action_type == CM_ACTION_INSERT || ctx->action_type == CM_ACTION_UPSERT ||
ctx->action_type == CM_ACTION_RENAME) {
ctx->action_type == CM_ACTION_RENAME) {

if (!ctx->value) {
flb_plg_error(ctx->ins, "value is required for action '%s'", ctx->action_str);
Expand Down
175 changes: 175 additions & 0 deletions plugins/processor_content_modifier/cm_logs.c
Original file line number Diff line number Diff line change
Expand Up @@ -595,25 +595,200 @@ static int run_action_convert(struct content_modifier_ctx *ctx,
return 0;
}

static struct cfl_variant *otel_get_or_create_attributes(struct cfl_kvlist *kvlist)
{
int ret;
struct cfl_list *head;
struct cfl_list *tmp;
struct cfl_kvpair *kvpair;
struct cfl_variant *val;
struct cfl_kvlist *kvlist_tmp;

/* iterate resource to find the attributes field */
cfl_list_foreach_safe(head, tmp, &kvlist->list) {
kvpair = cfl_list_entry(head, struct cfl_kvpair, _head);
if (cfl_sds_len(kvpair->key) != 10) {
continue;
}

if (strncmp(kvpair->key, "attributes", 10) == 0) {
val = kvpair->val;
if (val->type != CFL_VARIANT_KVLIST) {
return NULL;
}

return val;
}
}

/* create an empty kvlist as the value of attributes */
kvlist_tmp = cfl_kvlist_create();
if (!kvlist_tmp) {
return NULL;
}

/* create the attributes kvpair */
ret = cfl_kvlist_insert_kvlist_s(kvlist, "attributes", 10, kvlist_tmp);
if (ret != 0) {
cfl_kvlist_destroy(kvlist_tmp);
return NULL;
}

/* get the last kvpair from the list */
kvpair = cfl_list_entry_last(&kvlist->list, struct cfl_kvpair, _head);
if (!kvpair) {
return NULL;
}

return kvpair->val;
}


static struct cfl_variant *otel_get_attributes(int context, struct flb_mp_chunk_record *record)
{
int key_len;
const char *key_buf;
struct cfl_list *head;
struct cfl_object *obj = NULL;
struct cfl_variant *val;
struct cfl_kvlist *kvlist;
struct cfl_kvpair *kvpair;
struct cfl_variant *var_attr;

if (context == CM_CONTEXT_OTEL_RESOURCE_ATTR) {
key_buf = "resource";
key_len = 8;
}
else if (context == CM_CONTEXT_OTEL_SCOPE_ATTR) {
key_buf = "scope";
key_len = 5;
}
else {
return NULL;
}

obj = record->cobj_record;
kvlist = obj->variant->data.as_kvlist;
cfl_list_foreach(head, &kvlist->list) {
kvpair = cfl_list_entry(head, struct cfl_kvpair, _head);

if (cfl_sds_len(kvpair->key) != key_len) {
continue;
}

if (strncmp(kvpair->key, key_buf, key_len) == 0) {
val = kvpair->val;
if (val->type != CFL_VARIANT_KVLIST) {
return NULL;
}

var_attr = otel_get_or_create_attributes(val->data.as_kvlist);
if (!var_attr) {
return NULL;
}

return var_attr;
}
}

return NULL;
}

static struct cfl_variant *otel_get_scope(struct flb_mp_chunk_record *record)
{
struct cfl_list *head;
struct cfl_object *obj;
struct cfl_variant *val;
struct cfl_kvlist *kvlist;
struct cfl_kvpair *kvpair;

obj = record->cobj_record;
kvlist = obj->variant->data.as_kvlist;
cfl_list_foreach(head, &kvlist->list) {
kvpair = cfl_list_entry(head, struct cfl_kvpair, _head);

if (cfl_sds_len(kvpair->key) != 5) {
continue;
}

if (strncmp(kvpair->key, "scope", 5) == 0) {
val = kvpair->val;
if (val->type != CFL_VARIANT_KVLIST) {
return NULL;
}

return val;
}
}

return NULL;
}
int cm_logs_process(struct flb_processor_instance *ins,
struct content_modifier_ctx *ctx,
struct flb_mp_chunk_cobj *chunk_cobj,
const char *tag,
int tag_len)
{
int ret = -1;
int record_type;
struct flb_mp_chunk_record *record;
struct cfl_object *obj = NULL;
struct cfl_object obj_static;
struct cfl_variant *var;

/* Iterate records */
while ((ret = flb_mp_chunk_cobj_record_next(chunk_cobj, &record)) == FLB_MP_CHUNK_RECORD_OK) {
obj = NULL;

/* Retrieve information about the record type */
ret = flb_log_event_decoder_get_record_type(&record->event, &record_type);
if (ret != 0) {
flb_plg_error(ctx->ins, "record has invalid event type");
continue;
}

/* retrieve the target cfl object */
if (ctx->context_type == CM_CONTEXT_LOG_METADATA) {
obj = record->cobj_metadata;
}
else if (ctx->context_type == CM_CONTEXT_LOG_BODY) {
obj = record->cobj_record;
}
else if (ctx->context_type == CM_CONTEXT_OTEL_RESOURCE_ATTR &&
record_type == FLB_LOG_EVENT_GROUP_START) {
var = otel_get_attributes(CM_CONTEXT_OTEL_RESOURCE_ATTR, record);
if (!var) {
continue;
}

obj_static.type = CFL_VARIANT_KVLIST;
obj_static.variant = var;
obj = &obj_static;
}
else if (ctx->context_type == CM_CONTEXT_OTEL_SCOPE_ATTR &&
record_type == FLB_LOG_EVENT_GROUP_START) {

var = otel_get_attributes(CM_CONTEXT_OTEL_SCOPE_ATTR, record);
if (!var) {
continue;
}

obj_static.type = CFL_VARIANT_KVLIST;
obj_static.variant = var;
obj = &obj_static;
}
else if ((ctx->context_type == CM_CONTEXT_OTEL_SCOPE_NAME || ctx->context_type == CM_CONTEXT_OTEL_SCOPE_VERSION) &&
record_type == FLB_LOG_EVENT_GROUP_START) {

var = otel_get_scope(record);
obj_static.type = CFL_VARIANT_KVLIST;
obj_static.variant = var;
obj = &obj_static;
}

if (!obj) {
continue;
}

/* the operation on top of the data type is unsupported */
if (obj->variant->type != CFL_VARIANT_KVLIST) {
Expand Down