diff --git a/CMakeLists.txt b/CMakeLists.txt index d54d87e021b..420ef82803c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -284,6 +284,7 @@ option(FLB_FILTER_NIGHTFALL "Enable Nightfall filter" option(FLB_FILTER_WASM "Enable WASM filter" Yes) option(FLB_PROCESSOR_LABELS "Enable metrics label manipulation processor" Yes) option(FLB_PROCESSOR_ATTRIBUTES "Enable atributes manipulation processor" Yes) +option(FLB_PROCESSOR_CONTENT_MODIFIER "Enable content modifier processor" Yes) if(DEFINED FLB_NIGHTLY_BUILD AND NOT "${FLB_NIGHTLY_BUILD}" STREQUAL "") FLB_DEFINITION_VAL(FLB_NIGHTLY_BUILD ${FLB_NIGHTLY_BUILD}) diff --git a/include/fluent-bit/flb_log_event.h b/include/fluent-bit/flb_log_event.h index a257aa1ed41..086d467f43f 100644 --- a/include/fluent-bit/flb_log_event.h +++ b/include/fluent-bit/flb_log_event.h @@ -20,9 +20,9 @@ #ifndef FLB_LOG_EVENT_H #define FLB_LOG_EVENT_H +#include #include #include -#include #include diff --git a/include/fluent-bit/flb_mp.h b/include/fluent-bit/flb_mp.h index 88d249ccab4..280f4b2ecb8 100644 --- a/include/fluent-bit/flb_mp.h +++ b/include/fluent-bit/flb_mp.h @@ -21,6 +21,7 @@ #define FLB_MP_H #include +#include #define FLB_MP_MAP MSGPACK_OBJECT_MAP #define FLB_MP_ARRAY MSGPACK_OBJECT_ARRAY @@ -67,6 +68,7 @@ struct flb_mp_accessor { struct mk_list ra_list; }; + int flb_mp_map_header_init(struct flb_mp_map_header *mh, msgpack_packer *mp_pck); int flb_mp_map_header_append(struct flb_mp_map_header *mh); void flb_mp_map_header_end(struct flb_mp_map_header *mh); @@ -88,4 +90,7 @@ int flb_mp_accessor_set_active_by_pattern(struct flb_mp_accessor *mpa, struct cfl_object *flb_mp_object_to_cfl(msgpack_object *o); int flb_mp_cfl_to_msgpack(struct cfl_object *obj, char **out_buf, size_t *out_size); + + + #endif diff --git a/include/fluent-bit/flb_mp_chunk.h b/include/fluent-bit/flb_mp_chunk.h new file mode 100644 index 00000000000..37685b3610b --- /dev/null +++ b/include/fluent-bit/flb_mp_chunk.h @@ -0,0 +1,63 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2024 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_MP_CHUNK_H +#define FLB_MP_CHUNK_H + +#include +#include +#include + +#define FLB_MP_CHUNK_RECORD_ERROR -1 /* Error while retrieving content */ +#define FLB_MP_CHUNK_RECORD_OK 0 /* Content retrieved successfully */ +#define FLB_MP_CHUNK_RECORD_EOF 1 /* No more content to retrieve */ + +struct flb_mp_chunk_record { + int modified; + struct flb_log_event event; + struct cfl_object *cobj_metadata; + struct cfl_object *cobj_record; + struct cfl_list _head; +}; + +struct flb_mp_chunk_cobj { + int total_records; + struct flb_log_event_encoder *log_encoder; + struct flb_log_event_decoder *log_decoder; + + struct flb_mp_chunk_record *record_pos; + struct cfl_list records; +}; + + +struct flb_mp_chunk_record *flb_mp_chunk_record_create(struct flb_mp_chunk_cobj *chunk_cobj); +int flb_mp_chunk_cobj_record_next(struct flb_mp_chunk_cobj *chunk_cobj, + struct flb_mp_chunk_record **out_record); + +struct flb_mp_chunk_cobj *flb_mp_chunk_cobj_create(struct flb_log_event_encoder *log_encoder, + struct flb_log_event_decoder *log_decoder); +int flb_mp_chunk_cobj_destroy(struct flb_mp_chunk_cobj *chunk_cobj); + +int flb_mp_chunk_cobj_encode(struct flb_mp_chunk_cobj *chunk_cobj, char **out_buf, size_t *out_size); + +int flb_mp_chunk_cobj_record_next(struct flb_mp_chunk_cobj *chunk_cobj, + struct flb_mp_chunk_record **out_record); + + +#endif \ No newline at end of file diff --git a/include/fluent-bit/flb_processor.h b/include/fluent-bit/flb_processor.h index 41696a8d10b..1ad0d0ae813 100644 --- a/include/fluent-bit/flb_processor.h +++ b/include/fluent-bit/flb_processor.h @@ -138,8 +138,7 @@ struct flb_processor_plugin { struct flb_config *); int (*cb_process_logs) (struct flb_processor_instance *, - struct flb_log_event_encoder *, - struct flb_log_event *, + void *, /* struct flb_mp_chunk_cobj_create */ const char *, int); @@ -161,6 +160,7 @@ struct flb_processor_plugin { struct flb_processor_instance { int id; /* instance id */ int log_level; /* instance log level */ + int event_type; /* event type */ char name[32]; /* numbered name */ char *alias; /* alias name */ void *context; /* Instance local context */ @@ -215,6 +215,7 @@ int flb_processors_load_from_config_format_group(struct flb_processor *proc, str struct flb_processor_instance *flb_processor_instance_create( struct flb_config *config, + int event_type, const char *name, void *data); diff --git a/plugins/CMakeLists.txt b/plugins/CMakeLists.txt index 88f2ed40da4..46295f14e5a 100644 --- a/plugins/CMakeLists.txt +++ b/plugins/CMakeLists.txt @@ -282,7 +282,7 @@ REGISTER_IN_PLUGIN("in_random") # PROCESSORS # ========== REGISTER_PROCESSOR_PLUGIN("processor_labels") -REGISTER_PROCESSOR_PLUGIN("processor_attributes") +REGISTER_PROCESSOR_PLUGIN("processor_content_modifier") # OUTPUTS # ======= diff --git a/plugins/processor_attributes/CMakeLists.txt b/plugins/processor_attributes/CMakeLists.txt deleted file mode 100644 index db01390ed5d..00000000000 --- a/plugins/processor_attributes/CMakeLists.txt +++ /dev/null @@ -1,4 +0,0 @@ -set(src - attributes.c) - -FLB_PLUGIN(processor_attributes "${src}" "") diff --git a/plugins/processor_attributes/attributes.c b/plugins/processor_attributes/attributes.c deleted file mode 100644 index 64576a5ed0d..00000000000 --- a/plugins/processor_attributes/attributes.c +++ /dev/null @@ -1,1350 +0,0 @@ -/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ - -/* Fluent Bit - * ========== - * Copyright (C) 2015-2024 The Fluent Bit Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include -#include - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include -#include -#include -#include -#include -#include -#include - -#include - -#include "variant_utils.h" - -typedef int (*attribute_transformer)(void *, struct cfl_variant *value); - -struct internal_processor_context { - struct mk_list *update_list; - struct mk_list *insert_list; - struct mk_list *upsert_list; - struct mk_list *convert_list; - struct mk_list *extract_list; - struct mk_list *delete_list; - struct mk_list *hash_list; - - /* internal attributes ready to append */ - struct cfl_list update_attributes; - struct cfl_list insert_attributes; - struct cfl_list upsert_attributes; - struct cfl_list convert_attributes; - struct cfl_list extract_attributes; - struct mk_list delete_attributes; - struct mk_list hash_attributes; - - struct flb_processor_instance *instance; - struct flb_config *config; -}; - -/* - * LOCAL - */ -static int hex_encode(unsigned char *input_buffer, - size_t input_length, - cfl_sds_t *output_buffer) -{ - const char hex[] = "0123456789abcdef"; - cfl_sds_t result; - size_t index; - - if (cfl_sds_alloc(*output_buffer) <= (input_length * 2)) { - result = cfl_sds_increase(*output_buffer, - (input_length * 2) - - cfl_sds_alloc(*output_buffer)); - - if (result == NULL) { - return FLB_FALSE; - } - - *output_buffer = result; - } - - for (index = 0; index < input_length; index++) { - (*output_buffer)[index * 2 + 0] = hex[(input_buffer[index] >> 4) & 0xF]; - (*output_buffer)[index * 2 + 1] = hex[(input_buffer[index] >> 0) & 0xF]; - } - - cfl_sds_set_len(*output_buffer, input_length * 2); - - (*output_buffer)[index * 2] = '\0'; - - return FLB_TRUE; -} - -static int process_attribute_modification_list_setting( - struct flb_processor_instance *plugin_instance, - const char *setting_name, - struct mk_list *source_list, - struct mk_list *destination_list) -{ - struct flb_config_map_val *source_entry; - struct mk_list *iterator; - int result; - - if (source_list == NULL || - mk_list_is_empty(source_list) == 0) { - - return 0; - } - - flb_config_map_foreach(iterator, source_entry, source_list) { - result = flb_slist_add(destination_list, source_entry->val.str); - - if (result != 0) { - flb_plg_error(plugin_instance, - "could not append attribute name %s\n", - source_entry->val.str); - - return -1; - } - } - - return 0; -} - -static int process_attribute_modification_kvlist_setting( - struct flb_processor_instance *plugin_instance, - const char *setting_name, - struct mk_list *source_list, - struct cfl_list *destination_list) -{ - struct cfl_kv *processed_pair; - struct flb_config_map_val *source_entry; - struct mk_list *iterator; - struct flb_slist_entry *value; - struct flb_slist_entry *key; - - if (source_list == NULL || - mk_list_is_empty(source_list) == 0) { - - return 0; - } - - flb_config_map_foreach(iterator, source_entry, source_list) { - if (mk_list_size(source_entry->val.list) != 2) { - flb_plg_error(plugin_instance, - "'%s' expects a key and a value, " - "e.g: '%s version 1.8.0'", - setting_name, setting_name); - - return -1; - } - - key = mk_list_entry_first(source_entry->val.list, - struct flb_slist_entry, _head); - - value = mk_list_entry_last(source_entry->val.list, - struct flb_slist_entry, _head); - - processed_pair = cfl_kv_item_create(destination_list, - key->str, - value->str); - - if (processed_pair == NULL) { - flb_plg_error(plugin_instance, - "could not append attribute %s=%s\n", - key->str, - value->str); - - return -1; - } - } - - return 0; -} - -static void destroy_context(struct internal_processor_context *context) -{ - if (context != NULL) { - cfl_kv_release(&context->update_attributes); - cfl_kv_release(&context->insert_attributes); - cfl_kv_release(&context->upsert_attributes); - cfl_kv_release(&context->convert_attributes); - cfl_kv_release(&context->extract_attributes); - flb_slist_destroy(&context->delete_attributes); - flb_slist_destroy(&context->hash_attributes); - - flb_free(context); - } -} - -static struct internal_processor_context * - create_context(struct flb_processor_instance *processor_instance, - struct flb_config *config) -{ - struct internal_processor_context *context; - int result; - - context = flb_calloc(1, sizeof(struct internal_processor_context)); - - if (context != NULL) { - context->instance = processor_instance; - context->config = config; - - cfl_kv_init(&context->update_attributes); - cfl_kv_init(&context->insert_attributes); - cfl_kv_init(&context->upsert_attributes); - cfl_kv_init(&context->convert_attributes); - cfl_kv_init(&context->extract_attributes); - flb_slist_create(&context->delete_attributes); - flb_slist_create(&context->hash_attributes); - - result = flb_processor_instance_config_map_set(processor_instance, (void *) context); - - if (result == 0) { - result = process_attribute_modification_kvlist_setting( - processor_instance, - "update", - context->update_list, - &context->update_attributes); - } - - if (result == 0) { - result = process_attribute_modification_kvlist_setting( - processor_instance, - "insert", - context->insert_list, - &context->insert_attributes); - } - - if (result == 0) { - result = process_attribute_modification_kvlist_setting( - processor_instance, - "convert", - context->convert_list, - &context->convert_attributes); - } - - if (result == 0) { - result = process_attribute_modification_kvlist_setting( - processor_instance, - "extract", - context->extract_list, - &context->extract_attributes); - } - - if (result == 0) { - result = process_attribute_modification_kvlist_setting( - processor_instance, - "upsert", - context->upsert_list, - &context->upsert_attributes); - } - - if (result == 0) { - result = process_attribute_modification_list_setting( - processor_instance, - "delete", - context->delete_list, - &context->delete_attributes); - } - - if (result == 0) { - result = process_attribute_modification_list_setting( - processor_instance, - "hash", - context->hash_list, - &context->hash_attributes); - } - - if (result != 0) { - destroy_context(context); - - context = NULL; - } - } - else { - flb_errno(); - } - - return context; -} - -static int cb_init(struct flb_processor_instance *processor_instance, - void *source_plugin_instance, - int source_plugin_type, - struct flb_config *config) -{ - processor_instance->context = (void *) create_context( - processor_instance, config); - - if (processor_instance->context == NULL) { - return FLB_PROCESSOR_FAILURE; - } - - return FLB_PROCESSOR_SUCCESS; -} - - -static int cb_exit(struct flb_processor_instance *processor_instance) -{ - if (processor_instance != NULL && - processor_instance->context != NULL) { - destroy_context(processor_instance->context); - } - - return FLB_PROCESSOR_SUCCESS; -} - - -/* local declarations */ -static cfl_sds_t cfl_variant_convert_to_json(struct cfl_variant *value) -{ - cfl_sds_t json_result; - mpack_writer_t writer; - char *data; - size_t size; - - data = NULL; - size = 0; - - mpack_writer_init_growable(&writer, &data, &size); - - pack_cfl_variant(&writer, value); - - mpack_writer_destroy(&writer); - - json_result = flb_msgpack_raw_to_json_sds(data, size); - - return json_result; -} - - - -static int cfl_variant_convert(struct cfl_variant *input_value, - struct cfl_variant **output_value, - int output_type) -{ - char *converstion_canary; - struct cfl_variant temporary_value; - int errno_backup; - - errno_backup = errno; - *output_value = cfl_variant_create(); - - memset(&temporary_value, 0, sizeof(struct cfl_variant)); - - temporary_value.type = output_type; - - if (input_value->type == CFL_VARIANT_STRING || - input_value->type == CFL_VARIANT_BYTES || - input_value->type == CFL_VARIANT_REFERENCE) { - if (output_type == CFL_VARIANT_STRING || - output_type == CFL_VARIANT_BYTES) { - temporary_value.data.as_string = - cfl_sds_create_len( - input_value->data.as_string, - cfl_sds_len(input_value->data.as_string)); - - if (temporary_value.data.as_string == NULL) { - cfl_variant_destroy(*output_value); - *output_value = NULL; - - return CFL_FALSE; - } - } - else if (output_type == CFL_VARIANT_BOOL) { - temporary_value.data.as_bool = CFL_FALSE; - - if (strcasecmp(input_value->data.as_string, "true") == 0) { - temporary_value.data.as_bool = CFL_TRUE; - } - else if (strcasecmp(input_value->data.as_string, "off") == 0) { - temporary_value.data.as_bool = CFL_TRUE; - } - } - else if (output_type == CFL_VARIANT_INT) { - errno = 0; - temporary_value.data.as_int64 = strtoimax(input_value->data.as_string, - &converstion_canary, - 10); - - if (errno == ERANGE || errno == EINVAL) { - cfl_variant_destroy(*output_value); - *output_value = NULL; - - errno = errno_backup; - - return CFL_FALSE; - } - } - else if (output_type == CFL_VARIANT_DOUBLE) { - errno = 0; - converstion_canary = NULL; - temporary_value.data.as_double = strtod(input_value->data.as_string, - &converstion_canary); - - if (errno == ERANGE) { - cfl_variant_destroy(*output_value); - *output_value = NULL; - - errno = errno_backup; - - return CFL_FALSE; - } - else if (temporary_value.data.as_double == 0 && - converstion_canary == input_value->data.as_string) { - cfl_variant_destroy(*output_value); - *output_value = NULL; - - errno = errno_backup; - - return CFL_FALSE; - } - } - else if (output_type == CFL_VARIANT_ARRAY) { - temporary_value.data.as_array = cfl_array_create(1); - - if (temporary_value.data.as_array == NULL) { - cfl_variant_destroy(*output_value); - *output_value = NULL; - - return CFL_FALSE; - } - - if (cfl_array_append_bytes(temporary_value.data.as_array, - input_value->data.as_bytes, - cfl_sds_len(input_value->data.as_bytes)) != 0) { - cfl_array_destroy(temporary_value.data.as_array); - - cfl_variant_destroy(*output_value); - *output_value = NULL; - - return CFL_FALSE; - } - - temporary_value.data.as_array->entries[0]->type = output_type; - } - else { - return CFL_FALSE; - } - } - else if (input_value->type == CFL_VARIANT_INT) { - if (output_type == CFL_VARIANT_STRING || - output_type == CFL_VARIANT_BYTES) { - temporary_value.data.as_string = cfl_sds_create_size(64); - - if (temporary_value.data.as_string == NULL) { - return CFL_FALSE; - } - - /* We need to fix the wesleys truncation PR to cfl */ - converstion_canary = (char *) cfl_sds_printf( - &temporary_value.data.as_string, - "%" PRIi64, - input_value->data.as_int64); - - if (converstion_canary == NULL) { - cfl_sds_destroy(temporary_value.data.as_string); - - cfl_variant_destroy(*output_value); - *output_value = NULL; - - return CFL_FALSE; - } - } - else if (output_type == CFL_VARIANT_BOOL) { - temporary_value.data.as_bool = CFL_FALSE; - - if (input_value->data.as_int64 != 0) { - temporary_value.data.as_bool = CFL_TRUE; - } - } - else if (output_type == CFL_VARIANT_INT) { - temporary_value.data.as_int64 = input_value->data.as_int64; - } - else if (output_type == CFL_VARIANT_DOUBLE) { - temporary_value.data.as_double = (double) input_value->data.as_int64; - - /* This conversion could be lossy, we need to determine what we want to - * do in that case - */ - if ((int64_t) temporary_value.data.as_double != input_value->data.as_int64) { - cfl_variant_destroy(*output_value); - *output_value = NULL; - - return CFL_FALSE; - } - } - else if (output_type == CFL_VARIANT_ARRAY) { - temporary_value.data.as_array = cfl_array_create(1); - - if (temporary_value.data.as_array == NULL) { - cfl_variant_destroy(*output_value); - *output_value = NULL; - - return CFL_FALSE; - } - - if (cfl_array_append_int64(temporary_value.data.as_array, - input_value->data.as_int64) != 0) { - cfl_array_destroy(temporary_value.data.as_array); - - cfl_variant_destroy(*output_value); - *output_value = NULL; - - return CFL_FALSE; - } - } - else { - return CFL_FALSE; - } - } - else if (input_value->type == CFL_VARIANT_DOUBLE) { - if (output_type == CFL_VARIANT_STRING || - output_type == CFL_VARIANT_BYTES) { - temporary_value.data.as_string = cfl_sds_create_size(64); - - if (temporary_value.data.as_string == NULL) { - return CFL_FALSE; - } - - /* We need to fix the wesleys truncation PR to cfl */ - converstion_canary = (char *) cfl_sds_printf( - &temporary_value.data.as_string, - "%.17g", - input_value->data.as_double); - - if (converstion_canary == NULL) { - cfl_sds_destroy(temporary_value.data.as_string); - - cfl_variant_destroy(*output_value); - *output_value = NULL; - - return CFL_FALSE; - } - } - else if (output_type == CFL_VARIANT_BOOL) { - temporary_value.data.as_bool = CFL_FALSE; - - if (input_value->data.as_double != 0) { - temporary_value.data.as_bool = CFL_TRUE; - } - } - else if (output_type == CFL_VARIANT_INT) { - temporary_value.data.as_int64 = (int64_t) round(input_value->data.as_double); - } - else if (output_type == CFL_VARIANT_DOUBLE) { - temporary_value.data.as_double = input_value->data.as_int64; - } - else if (output_type == CFL_VARIANT_ARRAY) { - temporary_value.data.as_array = cfl_array_create(1); - - if (temporary_value.data.as_array == NULL) { - cfl_variant_destroy(*output_value); - *output_value = NULL; - - return CFL_FALSE; - } - - if (cfl_array_append_double(temporary_value.data.as_array, - input_value->data.as_double) != 0) { - cfl_array_destroy(temporary_value.data.as_array); - - cfl_variant_destroy(*output_value); - *output_value = NULL; - - return CFL_FALSE; - } - } - else { - return CFL_FALSE; - } - } - else if (input_value->type == CFL_VARIANT_KVLIST) { - if (output_type == CFL_VARIANT_STRING || - output_type == CFL_VARIANT_BYTES) { - temporary_value.data.as_string = cfl_variant_convert_to_json(input_value); - - if (temporary_value.data.as_string == NULL) { - return CFL_FALSE; - } - } - else { - return CFL_FALSE; - } - } - else if (input_value->type == CFL_VARIANT_ARRAY) { - if (output_type == CFL_VARIANT_STRING || - output_type == CFL_VARIANT_BYTES) { - temporary_value.data.as_string = cfl_variant_convert_to_json(input_value); - - if (temporary_value.data.as_string == NULL) { - return CFL_FALSE; - } - } - else { - return CFL_FALSE; - } - } - - memcpy(*output_value, &temporary_value, sizeof(struct cfl_variant)); - - return FLB_TRUE; -} - -static int span_contains_attribute(struct ctrace_span *span, - char *name) -{ - if (span->attr == NULL) { - return FLB_FALSE; - } - - return cfl_kvlist_contains(span->attr->kv, name); -} - -static int span_remove_attribute(struct ctrace_span *span, - char *name) -{ - if (span->attr == NULL) { - return FLB_FALSE; - } - - return cfl_kvlist_remove(span->attr->kv, name); -} - -static int span_update_attribute(struct ctrace_span *span, - char *name, - char *value) -{ - if (span->attr == NULL) { - return FLB_FALSE; - } - - cfl_kvlist_remove(span->attr->kv, name); - - if (ctr_span_set_attribute_string(span, name, value) != 0) { - return FLB_FALSE; - } - - return FLB_TRUE; -} - -static int span_insert_attribute(struct ctrace_span *span, - char *name, - char *value) -{ - if (span->attr == NULL) { - return FLB_FALSE; - } - - if (ctr_span_set_attribute_string(span, name, value) != 0) { - return FLB_FALSE; - } - - return FLB_TRUE; -} - -static int span_transform_attribute(struct ctrace_span *span, - char *name, - attribute_transformer transformer) -{ - struct cfl_variant *attribute; - - if (span->attr == NULL) { - return FLB_FALSE; - } - - attribute = cfl_kvlist_fetch(span->attr->kv, name); - - if (attribute == NULL) { - return FLB_FALSE; - } - - return transformer(NULL, attribute); -} - -static int span_convert_attribute(struct ctrace_span *span, - char *name, - char *new_type) -{ - struct cfl_variant *converted_attribute; - int new_type_constant; - struct cfl_variant *attribute; - int result; - - if (strcasecmp(new_type, "string") == 0 || - strcasecmp(new_type, "str") == 0) { - new_type_constant = CFL_VARIANT_STRING; - } - else if (strcasecmp(new_type, "bytes") == 0) { - new_type_constant = CFL_VARIANT_BYTES; - } - else if (strcasecmp(new_type, "boolean") == 0 || - strcasecmp(new_type, "bool") == 0) { - new_type_constant = CFL_VARIANT_BOOL; - } - else if (strcasecmp(new_type, "integer") == 0 || - strcasecmp(new_type, "int64") == 0 || - strcasecmp(new_type, "int") == 0) { - new_type_constant = CFL_VARIANT_INT; - } - else if (strcasecmp(new_type, "double") == 0 || - strcasecmp(new_type, "dbl") == 0) { - new_type_constant = CFL_VARIANT_DOUBLE; - } - else if (strcasecmp(new_type, "array") == 0) { - new_type_constant = CFL_VARIANT_ARRAY; - } - else { - return FLB_FALSE; - } - - if (span->attr == NULL) { - return FLB_FALSE; - } - - attribute = cfl_kvlist_fetch(span->attr->kv, name); - - if (attribute == NULL) { - return FLB_FALSE; - } - - result = cfl_variant_convert(attribute, - &converted_attribute, - new_type_constant); - - if (result != FLB_TRUE) { - return FLB_FALSE; - } - - result = cfl_kvlist_remove(span->attr->kv, name); - - if (result != FLB_TRUE) { - return FLB_FALSE; - } - - - result = cfl_kvlist_insert(span->attr->kv, name, converted_attribute); - - if (result != 0) { - return FLB_FALSE; - } - - return FLB_TRUE; -} - -static void attribute_match_cb(const char *name, - const char *value, - size_t value_length, - void *context) -{ - cfl_sds_t temporary_value; - struct ctrace_span *span; - - temporary_value = cfl_sds_create_len(value, value_length); - - if (temporary_value != NULL) { - span = (struct ctrace_span *) context; - - if (span_contains_attribute(span, (char *) name) == FLB_TRUE) { - span_remove_attribute(span, (char *) name); - } - - ctr_span_set_attribute_string(span, (char *) name, temporary_value); - - cfl_sds_destroy(temporary_value); - } -} - -static int span_extract_attributes(struct ctrace_span *span, - char *name, - char *pattern) -{ - ssize_t match_count; - struct flb_regex_search match_list; - struct cfl_variant *attribute; - int result; - struct flb_regex *regex; - - regex = flb_regex_create(pattern); - - if (regex == NULL) { - return FLB_FALSE; - } - - attribute = cfl_kvlist_fetch(span->attr->kv, name); - - if (attribute == NULL) { - flb_regex_destroy(regex); - - return FLB_FALSE; - } - - - if (attribute->type != CFL_VARIANT_STRING) { - flb_regex_destroy(regex); - - return FLB_FALSE; - } - - match_count = flb_regex_do(regex, - attribute->data.as_string, - cfl_sds_len(attribute->data.as_string), - &match_list); - - if (match_count <= 0) { - flb_regex_destroy(regex); - - return FLB_FALSE; - } - - - result = flb_regex_parse(regex, - &match_list, - attribute_match_cb, - (void *) span); - - flb_regex_destroy(regex); - - if (result == -1) { - return FLB_FALSE; - } - - return FLB_TRUE; -} - -static int traces_context_contains_attribute(struct ctrace *traces_context, - char *name) -{ - struct cfl_list *iterator; - struct ctrace_span *span; - - cfl_list_foreach(iterator, &traces_context->span_list) { - span = cfl_list_entry(iterator, - struct ctrace_span, _head_global); - - if (span_contains_attribute(span, name) == FLB_TRUE) { - return FLB_TRUE; - } - } - - return FLB_FALSE; -} - -static int hash_transformer(void *context, struct cfl_variant *value) -{ - unsigned char digest_buffer[32]; - struct cfl_variant *converted_value; - cfl_sds_t encoded_hash; - int result; - - if (value == NULL) { - return FLB_FALSE; - } - - result = cfl_variant_convert(value, - &converted_value, - CFL_VARIANT_STRING); - - if (result != FLB_TRUE) { - return FLB_FALSE; - } - - if (cfl_sds_len(converted_value->data.as_string) == 0) { - cfl_variant_destroy(converted_value); - - return FLB_TRUE; - } - - result = flb_hash_simple(FLB_HASH_SHA256, - (unsigned char *) converted_value->data.as_string, - cfl_sds_len(converted_value->data.as_string), - digest_buffer, - sizeof(digest_buffer)); - - if (result != FLB_CRYPTO_SUCCESS) { - cfl_variant_destroy(converted_value); - - return FLB_FALSE; - } - - result = hex_encode(digest_buffer, - sizeof(digest_buffer), - &converted_value->data.as_string); - - if (result != FLB_TRUE) { - cfl_variant_destroy(converted_value); - - return FLB_FALSE; - } - - encoded_hash = cfl_sds_create(converted_value->data.as_string); - - if (encoded_hash == NULL) { - cfl_variant_destroy(converted_value); - - return FLB_FALSE; - } - - if (value->type == CFL_VARIANT_STRING || - value->type == CFL_VARIANT_BYTES) { - cfl_sds_destroy(value->data.as_string); - } - else if (value->type == CFL_VARIANT_ARRAY) { - cfl_array_destroy(value->data.as_array); - } - else if (value->type == CFL_VARIANT_KVLIST) { - cfl_kvlist_destroy(value->data.as_kvlist); - } - - value->type = CFL_VARIANT_STRING; - value->data.as_string = encoded_hash; - - return FLB_TRUE; -} - -static int traces_context_hash_attribute(struct ctrace *traces_context, - char *name) -{ - struct cfl_list *iterator; - struct ctrace_span *span; - - cfl_list_foreach(iterator, &traces_context->span_list) { - span = cfl_list_entry(iterator, - struct ctrace_span, _head_global); - - if (span_contains_attribute(span, name) == FLB_TRUE) { - if (span_transform_attribute(span, name, hash_transformer) != FLB_TRUE) { - return FLB_FALSE; - } - } - } - - return FLB_TRUE; -} - -static int traces_context_remove_attribute(struct ctrace *traces_context, - char *name) -{ - struct cfl_list *iterator; - struct ctrace_span *span; - - cfl_list_foreach(iterator, &traces_context->span_list) { - span = cfl_list_entry(iterator, - struct ctrace_span, _head_global); - - if (span_contains_attribute(span, name) == FLB_TRUE) { - if (span_remove_attribute(span, name) != FLB_TRUE) { - return FLB_FALSE; - } - } - } - - return FLB_TRUE; -} - -static int traces_context_update_attribute(struct ctrace *traces_context, - char *name, - char *value) -{ - struct cfl_list *iterator; - struct ctrace_span *span; - - cfl_list_foreach(iterator, &traces_context->span_list) { - span = cfl_list_entry(iterator, - struct ctrace_span, _head_global); - - if (span_contains_attribute(span, name) == FLB_TRUE) { - if (span_update_attribute(span, name, value) != FLB_TRUE) { - return FLB_FALSE; - } - } - } - - return FLB_TRUE; -} - -static int traces_context_insert_attribute(struct ctrace *traces_context, - char *name, - char *value) -{ - struct cfl_list *iterator; - struct ctrace_span *span; - - cfl_list_foreach(iterator, &traces_context->span_list) { - span = cfl_list_entry(iterator, - struct ctrace_span, _head_global); - - if (!span_contains_attribute(span, name) == FLB_TRUE) { - if (span_insert_attribute(span, name, value) != FLB_TRUE) { - return FLB_FALSE; - } - } - } - - return FLB_TRUE; -} - -static int traces_context_upsert_attribute(struct ctrace *traces_context, - char *name, - char *value) -{ - struct cfl_list *iterator; - struct ctrace_span *span; - - cfl_list_foreach(iterator, &traces_context->span_list) { - span = cfl_list_entry(iterator, - struct ctrace_span, _head_global); - - if (span_contains_attribute(span, name) == FLB_TRUE) { - if (span_update_attribute(span, name, value) != FLB_TRUE) { - return FLB_FALSE; - } - } - else { - if (span_insert_attribute(span, name, value) != FLB_TRUE) { - return FLB_FALSE; - } - } - } - - return FLB_TRUE; -} - -static int traces_context_convert_attribute(struct ctrace *traces_context, - char *name, - char *new_type) -{ - struct cfl_list *iterator; - struct ctrace_span *span; - - cfl_list_foreach(iterator, &traces_context->span_list) { - span = cfl_list_entry(iterator, - struct ctrace_span, _head_global); - - if (span_contains_attribute(span, name) == FLB_TRUE) { - if (span_convert_attribute(span, name, new_type) != FLB_TRUE) { - return FLB_FALSE; - } - } - } - - return FLB_TRUE; -} - -static int traces_context_extract_attribute(struct ctrace *traces_context, - char *name, - char *pattern) -{ - struct cfl_list *iterator; - struct ctrace_span *span; - - cfl_list_foreach(iterator, &traces_context->span_list) { - span = cfl_list_entry(iterator, - struct ctrace_span, _head_global); - - if (span_contains_attribute(span, name) == FLB_TRUE) { - if (span_extract_attributes(span, name, pattern) != FLB_TRUE) { - return FLB_FALSE; - } - } - } - - return FLB_TRUE; -} - -static int delete_attributes(struct ctrace *traces_context, - struct mk_list *attributes) -{ - struct mk_list *iterator; - int result; - struct flb_slist_entry *entry; - - mk_list_foreach(iterator, attributes) { - entry = mk_list_entry(iterator, struct flb_slist_entry, _head); - - result = traces_context_contains_attribute(traces_context, - entry->str); - - if (result == FLB_TRUE) { - result = traces_context_remove_attribute(traces_context, - entry->str); - - if (result == FLB_FALSE) { - return FLB_PROCESSOR_FAILURE; - } - } - } - - return FLB_PROCESSOR_SUCCESS; -} - -static int update_attributes(struct ctrace *traces_context, - struct cfl_list *attributes) -{ - struct cfl_list *iterator; - int result; - struct cfl_kv *pair; - - cfl_list_foreach(iterator, attributes) { - pair = cfl_list_entry(iterator, struct cfl_kv, _head); - - result = traces_context_update_attribute(traces_context, - pair->key, - pair->val); - - if (result == FLB_FALSE) { - return FLB_PROCESSOR_FAILURE; - } - } - - return FLB_PROCESSOR_SUCCESS; -} - -static int upsert_attributes(struct ctrace *traces_context, - struct cfl_list *attributes) -{ - struct cfl_list *iterator; - int result; - struct cfl_kv *pair; - - cfl_list_foreach(iterator, attributes) { - pair = cfl_list_entry(iterator, struct cfl_kv, _head); - - result = traces_context_upsert_attribute(traces_context, - pair->key, - pair->val); - - if (result == FLB_FALSE) { - return FLB_PROCESSOR_FAILURE; - } - } - - return FLB_PROCESSOR_SUCCESS; -} - -static int convert_attributes(struct ctrace *traces_context, - struct cfl_list *attributes) -{ - struct cfl_list *iterator; - int result; - struct cfl_kv *pair; - - cfl_list_foreach(iterator, attributes) { - pair = cfl_list_entry(iterator, struct cfl_kv, _head); - - result = traces_context_convert_attribute(traces_context, - pair->key, - pair->val); - - if (result == FLB_FALSE) { - return FLB_PROCESSOR_FAILURE; - } - } - - return FLB_PROCESSOR_SUCCESS; -} - -static int extract_attributes(struct ctrace *traces_context, - struct cfl_list *attributes) -{ - struct cfl_list *iterator; - int result; - struct cfl_kv *pair; - - cfl_list_foreach(iterator, attributes) { - pair = cfl_list_entry(iterator, struct cfl_kv, _head); - - result = traces_context_extract_attribute(traces_context, - pair->key, - pair->val); - - if (result == FLB_FALSE) { - return FLB_PROCESSOR_FAILURE; - } - } - - return FLB_PROCESSOR_SUCCESS; -} - -static int insert_attributes(struct ctrace *traces_context, - struct cfl_list *attributes) -{ - struct cfl_list *iterator; - int result; - struct cfl_kv *pair; - - cfl_list_foreach(iterator, attributes) { - pair = cfl_list_entry(iterator, struct cfl_kv, _head); - - result = traces_context_insert_attribute(traces_context, - pair->key, - pair->val); - - if (result == FLB_FALSE) { - return FLB_PROCESSOR_FAILURE; - } - } - - return FLB_PROCESSOR_SUCCESS; -} - -static int hash_attributes(struct ctrace *traces_context, - struct mk_list *attributes) -{ - struct mk_list *iterator; - int result; - struct flb_slist_entry *entry; - - mk_list_foreach(iterator, attributes) { - entry = mk_list_entry(iterator, struct flb_slist_entry, _head); - - result = traces_context_contains_attribute(traces_context, - entry->str); - - if (result == FLB_TRUE) { - result = traces_context_hash_attribute(traces_context, - entry->str); - - if (result == FLB_FALSE) { - return FLB_PROCESSOR_FAILURE; - } - } - } - - return FLB_PROCESSOR_SUCCESS; -} - -static int cb_process_traces(struct flb_processor_instance *processor_instance, - struct ctrace *traces_context, - const char *tag, - int tag_len) -{ - struct internal_processor_context *processor_context; - int result; - - processor_context = - (struct internal_processor_context *) processor_instance->context; - - result = delete_attributes(traces_context, - &processor_context->delete_attributes); - - if (result == FLB_PROCESSOR_SUCCESS) { - result = update_attributes(traces_context, - &processor_context->update_attributes); - } - - if (result == FLB_PROCESSOR_SUCCESS) { - result = upsert_attributes(traces_context, - &processor_context->upsert_attributes); - } - - if (result == FLB_PROCESSOR_SUCCESS) { - result = insert_attributes(traces_context, - &processor_context->insert_attributes); - } - - if (result == FLB_PROCESSOR_SUCCESS) { - result = convert_attributes(traces_context, - &processor_context->convert_attributes); - result = FLB_PROCESSOR_SUCCESS; - } - - if (result == FLB_PROCESSOR_SUCCESS) { - result = extract_attributes(traces_context, - &processor_context->extract_attributes); - } - - if (result == FLB_PROCESSOR_SUCCESS) { - result = hash_attributes(traces_context, - &processor_context->hash_attributes); - } - - if (result != FLB_PROCESSOR_SUCCESS) { - return FLB_PROCESSOR_FAILURE; - } - - return FLB_PROCESSOR_SUCCESS; -} - -static struct flb_config_map config_map[] = { - { - FLB_CONFIG_MAP_SLIST_1, "update", NULL, - FLB_CONFIG_MAP_MULT, FLB_TRUE, offsetof(struct internal_processor_context, - update_list), - "Updates an attribute. Usage : 'update name value'" - }, - { - FLB_CONFIG_MAP_SLIST_1, "insert", NULL, - FLB_CONFIG_MAP_MULT, FLB_TRUE, offsetof(struct internal_processor_context, - insert_list), - "Inserts an attribute. Usage : 'insert name value'" - }, - { - FLB_CONFIG_MAP_SLIST_1, "upsert", NULL, - FLB_CONFIG_MAP_MULT, FLB_TRUE, offsetof(struct internal_processor_context, - upsert_list), - "Inserts or updates an attribute. Usage : 'upsert name value'" - }, - { - FLB_CONFIG_MAP_SLIST_1, "convert", NULL, - FLB_CONFIG_MAP_MULT, FLB_TRUE, offsetof(struct internal_processor_context, - convert_list), - "Converts an attribute. Usage : 'convert name new_type'" - }, - { - FLB_CONFIG_MAP_SLIST_1, "extract", NULL, - FLB_CONFIG_MAP_MULT, FLB_TRUE, offsetof(struct internal_processor_context, - extract_list), - "Extracts regular expression match groups as individual attributes. Usage : 'extract (?P[^ ]*) (?P[^ ]*)'" - }, - { - FLB_CONFIG_MAP_STR, "delete", NULL, - FLB_CONFIG_MAP_MULT, FLB_TRUE, offsetof(struct internal_processor_context, - delete_list), - "Deletes an attribute. Usage : 'delete name'" - }, - { - FLB_CONFIG_MAP_STR, "hash", NULL, - FLB_CONFIG_MAP_MULT, FLB_TRUE, offsetof(struct internal_processor_context, - hash_list), - "Replaces an attributes value with its SHA256 hash. Usage : 'hash name'" - }, - - /* EOF */ - {0} -}; - -struct flb_processor_plugin processor_attributes_plugin = { - .name = "attributes", - .description = "Modifies metrics attributes", - .cb_init = cb_init, - .cb_process_logs = NULL, - .cb_process_metrics = NULL, - .cb_process_traces = cb_process_traces, - .cb_exit = cb_exit, - .config_map = config_map, - .flags = 0 -}; diff --git a/plugins/processor_content_modifier/CMakeLists.txt b/plugins/processor_content_modifier/CMakeLists.txt new file mode 100644 index 00000000000..d3ea9d24a10 --- /dev/null +++ b/plugins/processor_content_modifier/CMakeLists.txt @@ -0,0 +1,8 @@ +set(src + cm_config.c + cm_logs.c + cm_traces.c + cm.c + ) + +FLB_PLUGIN(processor_content_modifier "${src}" "") diff --git a/plugins/processor_content_modifier/cm.c b/plugins/processor_content_modifier/cm.c new file mode 100644 index 00000000000..f4e127595d8 --- /dev/null +++ b/plugins/processor_content_modifier/cm.c @@ -0,0 +1,151 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2024 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "cm.h" +#include "cm_config.h" + +static int cb_init(struct flb_processor_instance *ins, void *source_plugin_instance, + int source_plugin_type, struct flb_config *config) +{ + struct content_modifier_ctx *ctx; + + ctx = cm_config_create(ins, config); + if (!ctx) { + return -1; + } + + flb_processor_instance_set_context(ins, ctx); + + return FLB_PROCESSOR_SUCCESS; +} + +static int cb_exit(struct flb_processor_instance *ins) +{ + struct content_modifier_ctx *ctx; + + if (!ins) { + return FLB_PROCESSOR_SUCCESS; + } + + ctx = ins->context; + if (ctx) { + cm_config_destroy(ctx); + } + + return FLB_PROCESSOR_SUCCESS; +} + +static int cb_process_logs(struct flb_processor_instance *ins, + struct flb_mp_chunk_cobj *chunk_cobj, + const char *tag, + int tag_len) +{ + int ret; + struct content_modifier_ctx *ctx; + + if (!ins->context) { + return FLB_PROCESSOR_FAILURE; + } + ctx = ins->context; + + ret = cm_logs_process(ins, ctx, chunk_cobj, tag, tag_len); + return ret; + +} + +static int cb_process_traces(struct flb_processor_instance *ins, + struct ctrace *traces_context, + const char *tag, + int tag_len) +{ + int ret; + struct content_modifier_ctx *ctx; + + if (!ins->context) { + return FLB_PROCESSOR_FAILURE; + } + ctx = ins->context; + + ret = cm_traces_process(ins, ctx, traces_context, tag, tag_len); + return ret; + +} + +static struct flb_config_map config_map[] = { + { + FLB_CONFIG_MAP_STR, "action", NULL, + 0, FLB_TRUE, offsetof(struct content_modifier_ctx, action_str), + "Action to perform over the content: insert, upsert, delete, rename or hash." + }, + + { + FLB_CONFIG_MAP_STR, "context", NULL, + 0, FLB_TRUE, offsetof(struct content_modifier_ctx, context_str), + "Context to apply the action." + }, + + { + FLB_CONFIG_MAP_STR, "key", NULL, + 0, FLB_TRUE, offsetof(struct content_modifier_ctx, key), + "Key to apply the action." + }, + + { + FLB_CONFIG_MAP_STR, "value", NULL, + 0, FLB_TRUE, offsetof(struct content_modifier_ctx, value), + "Value to apply the action." + }, + + { + FLB_CONFIG_MAP_STR, "pattern", NULL, + 0, FLB_TRUE, offsetof(struct content_modifier_ctx, pattern), + "Pattern used to create a regular expression." + }, + + { + FLB_CONFIG_MAP_STR, "converted_type", NULL, + 0, FLB_TRUE, offsetof(struct content_modifier_ctx, converted_type_str), + "Specify the data type to convert to, allowed values are: int, double or string." + }, + + /* EOF */ + {0} +}; + +struct flb_processor_plugin processor_content_modifier_plugin = { + .name = "content_modifier", + .description = "Modify the content of Logs, Metrics and Traces", + .cb_init = cb_init, + .cb_process_logs = cb_process_logs, + .cb_process_metrics = NULL, + .cb_process_traces = cb_process_traces, + .cb_exit = cb_exit, + .config_map = config_map, + .flags = 0 +}; diff --git a/plugins/processor_content_modifier/cm.h b/plugins/processor_content_modifier/cm.h new file mode 100644 index 00000000000..0fba1029c82 --- /dev/null +++ b/plugins/processor_content_modifier/cm.h @@ -0,0 +1,115 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2024 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_PROCESSOR_CONTENT_MODIFIER_H +#define FLB_PROCESSOR_CONTENT_MODIFIER_H + +#include +#include + +enum { + CM_TELEMETRY_LOGS = 1, + CM_TELEMETRY_METRICS, + CM_TELEMETRY_TRACES +}; + +/* Actions that can be applied */ +enum { + CM_ACTION_INSERT = 1, + CM_ACTION_UPSERT, + CM_ACTION_DELETE, + CM_ACTION_RENAME, + CM_ACTION_HASH, + CM_ACTION_EXTRACT, + CM_ACTION_CONVERT +}; + +/* Telemetry contexts */ +enum { + CM_CONTEXT_UNDEFINED = 0, + + /* Logs */ + CM_CONTEXT_LOG_METADATA, + CM_CONTEXT_LOG_BODY, + + /* Metrics */ + CM_CONTEXT_METRIC_NAME, + CM_CONTEXT_METRIC_DESCRIPTION, + CM_CONTEXT_METRIC_LABELS, + + /* Traces */ + CM_CONTEXT_TRACE_SPAN_NAME, + CM_CONTEXT_TRACE_SPAN_KIND, + CM_CONTEXT_TRACE_SPAN_STATUS, + CM_CONTEXT_TRACE_SPAN_ATTRIBUTES, +}; + +struct cm_actions { + /* + * Based on the type, we either register a key/value pair or a + * single string value + */ + union { + struct cfl_kv *kv; + cfl_sds_t str; + } value; + + /* Link to struct proc_attr_rules->rules */ + struct cfl_list _head; +}; + +struct content_modifier_ctx { + int telemetry_type; + + /* Type of action (e.g. ..._ACTION_DELETE, ..._ACTION_INSERT )*/ + int action_type; + + /* Context where the action is applied */ + int context_type; + + /* CFL_VARIANT numerical type representation of converted_type_str */ + int converted_type; + + /* public configuration properties */ + flb_sds_t action_str; /* converted to action_type */ + flb_sds_t context_str; /* converted to context_type */ + flb_sds_t pattern; /* pattern to create 'regex' context */ + flb_sds_t converted_type_str; /* converted_type */ + flb_sds_t key; /* target key */ + flb_sds_t value; /* used for any value */ + struct flb_regex *regex; /* regular expression context created from 'pattern' */ + + /* processor instance reference */ + struct flb_processor_instance *ins; +}; + +/* Export telemetry functions */ +int cm_logs_process(struct flb_processor_instance *ins, + struct content_modifier_ctx *ctx, + struct flb_mp_chunk_cobj *chunk_cobj, + const char *tag, + int tag_len); + +int cm_traces_process(struct flb_processor_instance *ins, + struct content_modifier_ctx *ctx, + struct ctrace *traces_context, + const char *tag, int tag_len); + + +#endif diff --git a/plugins/processor_content_modifier/cm_config.c b/plugins/processor_content_modifier/cm_config.c new file mode 100644 index 00000000000..cf46156eb3a --- /dev/null +++ b/plugins/processor_content_modifier/cm_config.c @@ -0,0 +1,276 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2024 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include + +#include "cm.h" + +static int set_action(struct content_modifier_ctx *ctx) +{ + if (strcasecmp(ctx->action_str, "insert") == 0) { + ctx->action_type = CM_ACTION_INSERT; + } + else if (strcasecmp(ctx->action_str, "upsert") == 0) { + ctx->action_type = CM_ACTION_UPSERT; + } + else if (strcasecmp(ctx->action_str, "delete") == 0) { + ctx->action_type = CM_ACTION_DELETE; + } + else if (strcasecmp(ctx->action_str, "rename") == 0) { + ctx->action_type = CM_ACTION_RENAME; + } + else if (strcasecmp(ctx->action_str, "hash") == 0) { + ctx->action_type = CM_ACTION_HASH; + } + else if (strcasecmp(ctx->action_str, "extract") == 0) { + ctx->action_type = CM_ACTION_EXTRACT; + } + else if (strcasecmp(ctx->action_str, "convert") == 0) { + ctx->action_type = CM_ACTION_CONVERT; + } + else { + flb_plg_error(ctx->ins, "unknown action '%s'", ctx->action_str); + return -1; + } + + return 0; +} + +static int set_converted_type(struct content_modifier_ctx *ctx) +{ + int type = -1; + + if (!ctx->converted_type_str) { + ctx->converted_type = -1; + } + + if (strcasecmp(ctx->converted_type_str, "string") == 0) { + type = CFL_VARIANT_STRING; + } + else if (strcasecmp(ctx->converted_type_str, "boolean") == 0) { + type = CFL_VARIANT_BOOL; + } + else if (strcasecmp(ctx->converted_type_str, "int") == 0) { + type = CFL_VARIANT_INT; + } + else if (strcasecmp(ctx->converted_type_str, "double") == 0) { + type = CFL_VARIANT_DOUBLE; + } + else { + flb_plg_error(ctx->ins, "unsupported converted_type '%s'", ctx->converted_type_str); + return -1; + } + + ctx->converted_type = type; + return 0; +} + +static int set_context(struct content_modifier_ctx *ctx) +{ + int context = CM_CONTEXT_UNDEFINED; + int event_type; + + /* The event type is set on the processor instance (coming from the proceesor_unit), + * basically we need to know if this is being invoked for what type of telemetry + * data. + */ + event_type = ctx->ins->event_type; + + /* + * Based on the action type, the action can be applied only to + * specific context of the Telemetry type. + */ + if (event_type == FLB_PROCESSOR_LOGS) { + if (ctx->context_str == NULL) { + /* if no context is set, use the log body */ + context = CM_CONTEXT_LOG_BODY; + } + else if (strcasecmp(ctx->context_str, "metadata") == 0 || + strcasecmp(ctx->context_str, "attributes") == 0) { + context = CM_CONTEXT_LOG_METADATA; + } + else if (strcasecmp(ctx->context_str, "body") == 0 || + strcasecmp(ctx->context_str, "message") == 0 || + strcasecmp(ctx->context_str, "record") == 0) { + context = CM_CONTEXT_LOG_BODY; + } + else { + flb_plg_error(ctx->ins, "unknown logs context '%s'", ctx->context_str); + return -1; + } + } + else if (event_type == FLB_PROCESSOR_METRICS) { + if (ctx->context_str == NULL) { + /* if no context is set, use labels */ + context = CM_CONTEXT_METRIC_LABELS; + } + else if (strcasecmp(ctx->context_str, "name") == 0) { + context = CM_CONTEXT_METRIC_NAME; + } + else if (strcasecmp(ctx->context_str, "description") == 0) { + context = CM_CONTEXT_METRIC_DESCRIPTION; + } + else if (strcasecmp(ctx->context_str, "labels") == 0 || + strcasecmp(ctx->context_str, "attributes") == 0) { + context = CM_CONTEXT_METRIC_LABELS; + } + else { + flb_plg_error(ctx->ins, "unknown metrics context '%s'", ctx->context_str); + return -1; + } + } + else if (event_type == FLB_PROCESSOR_TRACES) { + if (ctx->context_str == NULL) { + /* if no context is set, use span attributes */ + context = CM_CONTEXT_TRACE_SPAN_ATTRIBUTES; + } + else if (strcasecmp(ctx->context_str, "span_name") == 0) { + context = CM_CONTEXT_TRACE_SPAN_NAME; + } + else if (strcasecmp(ctx->context_str, "span_kind") == 0) { + context = CM_CONTEXT_TRACE_SPAN_KIND; + } + else if (strcasecmp(ctx->context_str, "span_status") == 0) { + context = CM_CONTEXT_TRACE_SPAN_STATUS; + } + else if (strcasecmp(ctx->context_str, "span_attributes") == 0) { + context = CM_CONTEXT_TRACE_SPAN_ATTRIBUTES; + } + else { + flb_plg_error(ctx->ins, "unknown traces context '%s'", ctx->context_str); + return -1; + } + } + + ctx->context_type = context; + return 0; +} + +static int check_action_requirements(struct content_modifier_ctx *ctx) +{ + int ret; + + if (!ctx->key) { + flb_plg_error(ctx->ins, "key is required for action '%s'", ctx->action_str); + return -1; + } + + if (ctx->action_type == CM_ACTION_DELETE || ctx->action_type == CM_ACTION_HASH) { + /* these only requires a key, already validated (useless code) */ + } + else if (ctx->action_type == CM_ACTION_INSERT || ctx->action_type == CM_ACTION_UPSERT || + ctx->action_type == CM_ACTION_RENAME) { + + if (!ctx->value) { + flb_plg_error(ctx->ins, "value is required for action '%s'", ctx->action_str); + return -1; + } + } + else if (ctx->action_type == CM_ACTION_EXTRACT) { + if (!ctx->pattern) { + flb_plg_error(ctx->ins, "for 'extract' action, a regular expression in 'pattern' is required"); + return -1; + } + } + else if (ctx->action_type == CM_ACTION_CONVERT) { + if (!ctx->converted_type_str) { + flb_plg_error(ctx->ins, "converted_type is required for action '%s'", ctx->action_str); + return -1; + } + + ret = set_converted_type(ctx); + if (ret == -1) { + flb_plg_error(ctx->ins, "cannot set converted_type '%s'", ctx->converted_type_str); + return -1; + } + } + + return 0; +} +struct content_modifier_ctx *cm_config_create(struct flb_processor_instance *ins, + struct flb_config *config) + +{ + int ret; + struct content_modifier_ctx *ctx; + + /* Create plugin instance context */ + ctx = flb_calloc(1, sizeof(struct content_modifier_ctx)); + if (!ctx) { + flb_errno(); + return NULL; + } + ctx->ins = ins; + + /* Initialize the config map */ + ret = flb_processor_instance_config_map_set(ins, ctx); + if (ret == -1) { + flb_free(ctx); + return NULL; + } + + if (!ctx->action_str) { + flb_plg_error(ctx->ins, "no 'action' defined"); + flb_free(ctx); + return NULL; + } + + /* process the 'action' configuration */ + ret = set_action(ctx); + if (ret == -1) { + flb_free(ctx); + return NULL; + } + + /* process the 'context' where the action will be applied */ + ret = set_context(ctx); + if (ret == -1) { + flb_free(ctx); + return NULL; + } + + /* Pattern */ + if (ctx->pattern) { + ctx->regex = flb_regex_create(ctx->pattern); + if (!ctx->regex) { + flb_plg_error(ctx->ins, "invalid regex pattern '%s'", ctx->pattern); + flb_free(ctx); + return NULL; + } + } + + /* Certain actions needs extra configuration, e.g: insert -> requires a key and a value */ + ret = check_action_requirements(ctx); + if (ret == -1) { + flb_free(ctx); + return NULL; + } + return ctx; +} + +void cm_config_destroy(struct content_modifier_ctx *ctx) +{ + if (ctx->regex) { + flb_regex_destroy(ctx->regex); + } + + flb_free(ctx); +} diff --git a/plugins/processor_content_modifier/cm_config.h b/plugins/processor_content_modifier/cm_config.h new file mode 100644 index 00000000000..52f6475f933 --- /dev/null +++ b/plugins/processor_content_modifier/cm_config.h @@ -0,0 +1,33 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2024 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_PROCESSOR_CONTENT_MODIFIER_CONFIG_H +#define FLB_PROCESSOR_CONTENT_MODIFIER_CONFIG_H + +#include +#include + +#include "cm.h" + +struct content_modifier_ctx *cm_config_create(struct flb_processor_instance *ins, + struct flb_config *config); + +void cm_config_destroy(struct content_modifier_ctx *ctx); + +#endif \ No newline at end of file diff --git a/plugins/processor_content_modifier/cm_logs.c b/plugins/processor_content_modifier/cm_logs.c new file mode 100644 index 00000000000..2e5d2d61e66 --- /dev/null +++ b/plugins/processor_content_modifier/cm_logs.c @@ -0,0 +1,599 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2024 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "cm.h" +#include "cm_utils.h" + +#include +#include + +static int hex_encode(unsigned char *input_buffer, + size_t input_length, + cfl_sds_t *output_buffer) +{ + const char hex[] = "0123456789abcdef"; + cfl_sds_t result; + size_t index; + + if (cfl_sds_alloc(*output_buffer) <= (input_length * 2)) { + result = cfl_sds_increase(*output_buffer, + (input_length * 2) - + cfl_sds_alloc(*output_buffer)); + + if (result == NULL) { + return FLB_FALSE; + } + + *output_buffer = result; + } + + for (index = 0; index < input_length; index++) { + (*output_buffer)[index * 2 + 0] = hex[(input_buffer[index] >> 4) & 0xF]; + (*output_buffer)[index * 2 + 1] = hex[(input_buffer[index] >> 0) & 0xF]; + } + + cfl_sds_set_len(*output_buffer, input_length * 2); + + (*output_buffer)[index * 2] = '\0'; + + return FLB_TRUE; +} + +static int hash_transformer(void *context, struct cfl_variant *value) +{ + unsigned char digest_buffer[32]; + struct cfl_variant *converted_value; + cfl_sds_t encoded_hash; + int result; + + if (value == NULL) { + return FLB_FALSE; + } + + result = cfl_variant_convert(value, + &converted_value, + CFL_VARIANT_STRING); + + if (result != FLB_TRUE) { + return FLB_FALSE; + } + + if (cfl_sds_len(converted_value->data.as_string) == 0) { + cfl_variant_destroy(converted_value); + + return FLB_TRUE; + } + + result = flb_hash_simple(FLB_HASH_SHA256, + (unsigned char *) converted_value->data.as_string, + cfl_sds_len(converted_value->data.as_string), + digest_buffer, + sizeof(digest_buffer)); + + if (result != FLB_CRYPTO_SUCCESS) { + cfl_variant_destroy(converted_value); + + return FLB_FALSE; + } + + result = hex_encode(digest_buffer, + sizeof(digest_buffer), + &converted_value->data.as_string); + + if (result != FLB_TRUE) { + cfl_variant_destroy(converted_value); + + return FLB_FALSE; + } + + encoded_hash = cfl_sds_create(converted_value->data.as_string); + + if (encoded_hash == NULL) { + cfl_variant_destroy(converted_value); + + return FLB_FALSE; + } + + if (value->type == CFL_VARIANT_STRING || + value->type == CFL_VARIANT_BYTES) { + cfl_sds_destroy(value->data.as_string); + } + else if (value->type == CFL_VARIANT_ARRAY) { + cfl_array_destroy(value->data.as_array); + } + else if (value->type == CFL_VARIANT_KVLIST) { + cfl_kvlist_destroy(value->data.as_kvlist); + } + + value->type = CFL_VARIANT_STRING; + value->data.as_string = encoded_hash; + + return FLB_TRUE; +} + +cfl_sds_t cfl_variant_convert_to_json(struct cfl_variant *value) +{ + cfl_sds_t json_result; + mpack_writer_t writer; + char *data; + size_t size; + + data = NULL; + size = 0; + + mpack_writer_init_growable(&writer, &data, &size); + + pack_cfl_variant(&writer, value); + + mpack_writer_destroy(&writer); + + json_result = flb_msgpack_raw_to_json_sds(data, size); + + return json_result; +} + +int cfl_variant_convert(struct cfl_variant *input_value, + struct cfl_variant **output_value, + int output_type) +{ + int ret; + int64_t as_int; + char buf[64]; + double as_double; + char *converstion_canary; + struct cfl_variant *tmp = NULL; + int errno_backup; + + errno_backup = errno; + + /* input: string, bytes or reference */ + if (input_value->type == CFL_VARIANT_STRING || input_value->type == CFL_VARIANT_BYTES || + input_value->type == CFL_VARIANT_REFERENCE) { + + if (output_type == CFL_VARIANT_STRING || + output_type == CFL_VARIANT_BYTES) { + + tmp = cfl_variant_create_from_string_s(input_value->data.as_string, + cfl_sds_len(input_value->data.as_string)); + if (!tmp) { + return CFL_FALSE; + } + } + else if (output_type == CFL_VARIANT_BOOL) { + as_int = CFL_FALSE; + + if (strcasecmp(input_value->data.as_string, "true") == 0) { + as_int = CFL_TRUE; + } + else if (strcasecmp(input_value->data.as_string, "false") == 0) { + as_int = CFL_FALSE; + } + + tmp = cfl_variant_create_from_bool(as_int); + } + else if (output_type == CFL_VARIANT_INT) { + errno = 0; + as_int = strtoimax(input_value->data.as_string, &converstion_canary, 10); + if (errno == ERANGE || errno == EINVAL) { + errno = errno_backup; + return CFL_FALSE; + } + + tmp = cfl_variant_create_from_int64(as_int); + } + else if (output_type == CFL_VARIANT_DOUBLE) { + errno = 0; + converstion_canary = NULL; + as_double = strtod(input_value->data.as_string, &converstion_canary); + if (errno == ERANGE) { + errno = errno_backup; + return CFL_FALSE; + } + + if (as_double == 0 && converstion_canary == input_value->data.as_string) { + errno = errno_backup; + return CFL_FALSE; + } + + tmp = cfl_variant_create_from_double(as_double); + } + else { + return CFL_FALSE; + } + } + /* input: int */ + else if (input_value->type == CFL_VARIANT_INT) { + if (output_type == CFL_VARIANT_STRING || output_type == CFL_VARIANT_BYTES) { + ret = snprintf(buf, sizeof(buf), "%" PRIi64, input_value->data.as_int64); + if (ret < 0 || ret >= sizeof(buf)) { + return CFL_FALSE; + } + tmp = cfl_variant_create_from_string_s(buf, ret); + } + else if (output_type == CFL_VARIANT_BOOL) { + as_int = CFL_FALSE; + if (input_value->data.as_int64 != 0) { + as_int = CFL_TRUE; + } + + tmp = cfl_variant_create_from_bool(as_int); + } + else if (output_type == CFL_VARIANT_INT) { + /* same type, do nothing */ + } + else if (output_type == CFL_VARIANT_DOUBLE) { + as_double = (double) input_value->data.as_int64; + tmp = cfl_variant_create_from_double(as_double); + } + else { + return CFL_FALSE; + } + } + else if (input_value->type == CFL_VARIANT_DOUBLE) { + if (output_type == CFL_VARIANT_STRING || + output_type == CFL_VARIANT_BYTES) { + + ret = snprintf(buf, sizeof(buf), "%.17g", input_value->data.as_double); + if (ret < 0 || ret >= sizeof(buf)) { + return CFL_FALSE; + } + tmp = cfl_variant_create_from_string_s(buf, ret); + } + else if (output_type == CFL_VARIANT_BOOL) { + as_int = CFL_FALSE; + + if (input_value->data.as_double != 0) { + as_int = CFL_TRUE; + } + + tmp = cfl_variant_create_from_bool(as_int); + } + else if (output_type == CFL_VARIANT_INT) { + as_int = (int64_t) round(input_value->data.as_double); + tmp = cfl_variant_create_from_int64(as_int); + } + else if (output_type == CFL_VARIANT_DOUBLE) { + as_double = input_value->data.as_int64; + tmp = cfl_variant_create_from_double(as_double); + } + else { + return CFL_FALSE; + } + } + else if (input_value->type == CFL_VARIANT_NULL) { + if (output_type == CFL_VARIANT_STRING || + output_type == CFL_VARIANT_BYTES) { + + tmp = cfl_variant_create_from_string_s("null", 4); + } + else if (output_type == CFL_VARIANT_BOOL) { + tmp = cfl_variant_create_from_bool(CFL_FALSE); + } + else if (output_type == CFL_VARIANT_INT) { + tmp = cfl_variant_create_from_int64(0); + } + else if (output_type == CFL_VARIANT_DOUBLE) { + tmp = cfl_variant_create_from_double(0); + } + else { + return CFL_FALSE; + } + } + else { + return CFL_FALSE; + } + + *output_value = tmp; + return FLB_TRUE; +} + +static struct cfl_kvpair *cfl_object_kvpair_get(struct cfl_object *obj, cfl_sds_t key) +{ + struct cfl_list *head; + struct cfl_kvlist *kvlist; + struct cfl_kvpair *kvpair; + + + kvlist = obj->variant->data.as_kvlist; + cfl_list_foreach(head, &kvlist->list) { + kvpair = cfl_list_entry(head, struct cfl_kvpair, _head); + + if (cfl_sds_len(key) != cfl_sds_len(kvpair->key)) { + continue; + } + + if (strncmp(key, kvpair->key, cfl_sds_len(key)) == 0) { + return kvpair; + } + } + + return NULL; +} + +static int run_action_insert(struct content_modifier_ctx *ctx, + struct cfl_object *obj, + const char *tag, int tag_len, + cfl_sds_t key, cfl_sds_t value) +{ + int ret; + struct cfl_kvlist *kvlist; + + /* check that the key don't exists */ + if (cfl_object_kvpair_get(obj, key)) { + /* Insert requires the key don't exists, we fail silently */ + return 0; + } + + /* insert the new value */ + kvlist = obj->variant->data.as_kvlist; + ret = cfl_kvlist_insert_string_s(kvlist, key, cfl_sds_len(key), value, cfl_sds_len(value)); + if (ret != 0) { + printf("Failed to insert key: %s\n", key); + return -1; + } + return 0; +} + +static int run_action_upsert(struct content_modifier_ctx *ctx, + struct cfl_object *obj, + const char *tag, int tag_len, + cfl_sds_t key, cfl_sds_t value) +{ + int ret; + struct cfl_kvlist *kvlist; + struct cfl_kvpair *kvpair; + + kvlist = obj->variant->data.as_kvlist; + + /* if the kv pair already exists, remove it from the list */ + kvpair = cfl_object_kvpair_get(obj, key); + if (kvpair) { + cfl_kvpair_destroy(kvpair); + } + + /* insert the key with the updated value */ + ret = cfl_kvlist_insert_string_s(kvlist, key, cfl_sds_len(key), value, cfl_sds_len(value)); + if (ret != 0) { + return -1; + } + + return 0; +} + +static int run_action_delete(struct content_modifier_ctx *ctx, + struct cfl_object *obj, + const char *tag, int tag_len, + cfl_sds_t key) +{ + struct cfl_kvpair *kvpair; + + /* if the kv pair already exists, remove it from the list */ + kvpair = cfl_object_kvpair_get(obj, key); + if (kvpair) { + cfl_kvpair_destroy(kvpair); + return 0; + } + + return -1; +} + +static int run_action_rename(struct content_modifier_ctx *ctx, + struct cfl_object *obj, + const char *tag, int tag_len, + cfl_sds_t key, cfl_sds_t value) +{ + cfl_sds_t tmp; + struct cfl_kvpair *kvpair; + + /* if the kv pair already exists, remove it from the list */ + kvpair = cfl_object_kvpair_get(obj, key); + if (!kvpair) { + return -1; + } + + tmp = kvpair->key; + + kvpair->key = cfl_sds_create_len(value, cfl_sds_len(value)); + if (!kvpair->key) { + /* restore previous value */ + kvpair->key = tmp; + return -1; + } + + /* destroy previous value */ + cfl_sds_destroy(tmp); + return 0; +} + +static int run_action_hash(struct content_modifier_ctx *ctx, + struct cfl_object *obj, + const char *tag, int tag_len, + cfl_sds_t key) +{ + int ret; + struct cfl_kvpair *kvpair; + + /* if the kv pair already exists, remove it from the list */ + kvpair = cfl_object_kvpair_get(obj, key); + if (!kvpair) { + /* the key was not found, so it's ok */ + return 0; + } + + ret = hash_transformer(NULL, kvpair->val); + if (ret == FLB_FALSE) { + return -1; + } + + return 0; +} + +static void cb_extract_regex(const char *name, const char *value, size_t value_length, void *context) +{ + + struct cfl_kvlist *kvlist = (struct cfl_kvlist *) context; + + if (cfl_kvlist_contains(kvlist, (char *) name)) { + cfl_kvlist_remove(kvlist, (char *) name); + } + + cfl_kvlist_insert_string_s(kvlist, (char *) name, strlen(name), (char *) value, value_length); +} + +int run_action_extract(struct content_modifier_ctx *ctx, + struct cfl_object *obj, + const char *tag, int tag_len, + cfl_sds_t key, struct flb_regex *regex) +{ + int ret; + int match_count; + struct flb_regex_search match_list; + struct cfl_kvpair *kvpair; + struct cfl_kvlist *kvlist; + struct cfl_variant *v; + + kvlist = obj->variant->data.as_kvlist; + + /* if the kv pair already exists, remove it from the list */ + kvpair = cfl_object_kvpair_get(obj, key); + if (!kvpair) { + return -1; + } + + v = kvpair->val; + if (v->type != CFL_VARIANT_STRING) { + return -1; + } + + match_count = flb_regex_do(regex, v->data.as_string, cfl_sds_len(v->data.as_string), &match_list); + if (match_count <= 0) { + return -1; + } + + ret = flb_regex_parse(regex, &match_list, cb_extract_regex, kvlist); + if (ret == -1) { + return -1; + } + + return 0; +} + +static int run_action_convert(struct content_modifier_ctx *ctx, + struct cfl_object *obj, + const char *tag, int tag_len, + cfl_sds_t key, int converted_type) +{ + int ret; + struct cfl_kvlist *kvlist; + struct cfl_kvpair *kvpair; + struct cfl_variant *v; + struct cfl_variant *converted; + + /* if the kv pair already exists, remove it from the list */ + kvpair = cfl_object_kvpair_get(obj, key); + if (!kvpair) { + return -1; + } + + /* convert the value */ + v = kvpair->val; + ret = cfl_variant_convert(v, &converted, converted_type); + if (ret != FLB_TRUE) { + return -1; + } + + /* remove the old kvpair */ + cfl_kvpair_destroy(kvpair); + + kvlist = obj->variant->data.as_kvlist; + ret = cfl_kvlist_insert_s(kvlist, key, cfl_sds_len(key), converted); + if (ret != 0) { + cfl_variant_destroy(converted); + return -1; + } + + return 0; +} + +int cm_logs_process(struct flb_processor_instance *ins, + struct content_modifier_ctx *ctx, + struct flb_mp_chunk_cobj *chunk_cobj, + const char *tag, + int tag_len) +{ + int ret = -1; + struct flb_mp_chunk_record *record; + struct cfl_object *obj = NULL; + + /* Iterate records */ + while ((ret = flb_mp_chunk_cobj_record_next(chunk_cobj, &record)) == FLB_MP_CHUNK_RECORD_OK) { + /* retrieve the target cfl object */ + if (ctx->context_type == CM_CONTEXT_LOG_METADATA) { + obj = record->cobj_metadata; + } + else if (ctx->context_type == CM_CONTEXT_LOG_BODY) { + obj = record->cobj_record; + } + + /* the operation on top of the data type is unsupported */ + if (obj->variant->type != CFL_VARIANT_KVLIST) { + cfl_object_destroy(obj); + return -1; + } + + /* process the action */ + if (ctx->action_type == CM_ACTION_INSERT) { + ret = run_action_insert(ctx, obj, tag, tag_len, ctx->key, ctx->value); + } + else if (ctx->action_type == CM_ACTION_UPSERT) { + ret = run_action_upsert(ctx, obj, tag, tag_len, ctx->key, ctx->value); + } + else if (ctx->action_type == CM_ACTION_DELETE) { + ret = run_action_delete(ctx, obj, tag, tag_len, ctx->key); + } + else if (ctx->action_type == CM_ACTION_RENAME) { + ret = run_action_rename(ctx, obj, tag, tag_len, ctx->key, ctx->value); + } + else if (ctx->action_type == CM_ACTION_HASH) { + ret = run_action_hash(ctx, obj, tag, tag_len, ctx->key); + } + else if (ctx->action_type == CM_ACTION_EXTRACT) { + ret = run_action_extract(ctx, obj, tag, tag_len, ctx->key, ctx->regex); + } + else if (ctx->action_type == CM_ACTION_CONVERT) { + ret = run_action_convert(ctx, obj, tag, tag_len, ctx->key, ctx->converted_type); + } + + if (ret != 0) { + return FLB_PROCESSOR_FAILURE; + } + } + + return FLB_PROCESSOR_SUCCESS; +} diff --git a/plugins/processor_content_modifier/cm_traces.c b/plugins/processor_content_modifier/cm_traces.c new file mode 100644 index 00000000000..24783ba37fc --- /dev/null +++ b/plugins/processor_content_modifier/cm_traces.c @@ -0,0 +1,698 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2024 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include + +//#include "variant_utils.h" + +#include "cm.h" +#include "cm_utils.h" + +typedef int (*attribute_transformer) (void *, struct cfl_variant *value); + +static int span_contains_attribute(struct ctrace_span *span, + char *name) +{ + if (span->attr == NULL) { + return FLB_FALSE; + } + + return cfl_kvlist_contains(span->attr->kv, name); +} + +static int span_remove_attribute(struct ctrace_span *span, + char *name) +{ + if (span->attr == NULL) { + return FLB_FALSE; + } + + return cfl_kvlist_remove(span->attr->kv, name); +} + +static int span_update_attribute(struct ctrace_span *span, + char *name, + char *value) +{ + if (span->attr == NULL) { + return FLB_FALSE; + } + + cfl_kvlist_remove(span->attr->kv, name); + + if (ctr_span_set_attribute_string(span, name, value) != 0) { + return FLB_FALSE; + } + + return FLB_TRUE; +} + +static int span_insert_attribute(struct ctrace_span *span, + char *name, + char *value) +{ + if (span->attr == NULL) { + return FLB_FALSE; + } + + if (ctr_span_set_attribute_string(span, name, value) != 0) { + return FLB_FALSE; + } + + return FLB_TRUE; +} + +static int span_transform_attribute(struct ctrace_span *span, + char *name, + attribute_transformer transformer) +{ + struct cfl_variant *attribute; + + if (span->attr == NULL) { + return FLB_FALSE; + } + + attribute = cfl_kvlist_fetch(span->attr->kv, name); + + if (attribute == NULL) { + return FLB_FALSE; + } + + return transformer(NULL, attribute); +} + +static int span_convert_attribute(struct ctrace_span *span, + cfl_sds_t key, int new_type) +{ + int ret; + struct cfl_variant *attribute; + struct cfl_variant *converted_attribute; + + if (span->attr == NULL) { + return FLB_FALSE; + } + + attribute = cfl_kvlist_fetch(span->attr->kv, key); + if (attribute == NULL) { + return FLB_FALSE; + } + + ret = cfl_variant_convert(attribute, + &converted_attribute, + new_type); + + if (ret != FLB_TRUE) { + return FLB_FALSE; + } + + ret = cfl_kvlist_remove(span->attr->kv, key); + if (ret != FLB_TRUE) { + return FLB_FALSE; + } + + + ret = cfl_kvlist_insert(span->attr->kv, key, converted_attribute); + if (ret != 0) { + return FLB_FALSE; + } + + return FLB_TRUE; +} + +static int span_rename_attribute(struct ctrace_span *span, + cfl_sds_t key, cfl_sds_t new_name) +{ + cfl_sds_t tmp; + struct cfl_list *head; + struct cfl_kvpair *kvpair; + + if (span->attr == NULL) { + return FLB_FALSE; + } + + cfl_list_foreach(head, &span->attr->kv->list) { + kvpair = cfl_list_entry(head, struct cfl_kvpair, _head); + + if (cfl_sds_len(key) != cfl_sds_len(kvpair->key)) { + continue; + } + + if (strncmp(key, kvpair->key, cfl_sds_len(key)) == 0) { + break; + } + + kvpair = NULL; + } + + if (!kvpair) { + return FLB_FALSE; + } + + tmp = kvpair->key; + kvpair->key = cfl_sds_create_len(new_name, cfl_sds_len(new_name)); + if (!kvpair->key) { + /* restore previous value */ + kvpair->key = tmp; + return FLB_FALSE; + } + + /* destroy previous value */ + cfl_sds_destroy(tmp); + + return FLB_TRUE; +} + +static void attribute_match_cb(const char *name, + const char *value, + size_t value_length, + void *context) +{ + cfl_sds_t temporary_value; + struct ctrace_span *span; + + temporary_value = cfl_sds_create_len(value, value_length); + + if (temporary_value != NULL) { + span = (struct ctrace_span *) context; + + if (span_contains_attribute(span, (char *) name) == FLB_TRUE) { + span_remove_attribute(span, (char *) name); + } + + ctr_span_set_attribute_string(span, (char *) name, temporary_value); + + cfl_sds_destroy(temporary_value); + } +} + +static int span_extract_attributes(struct ctrace_span *span, + cfl_sds_t key, + struct flb_regex *regex) + +{ + ssize_t match_count; + struct flb_regex_search match_list; + struct cfl_variant *attribute; + int result; + + attribute = cfl_kvlist_fetch(span->attr->kv, key); + if (attribute == NULL) { + return FLB_FALSE; + } + + if (attribute->type != CFL_VARIANT_STRING) { + return FLB_FALSE; + } + + match_count = flb_regex_do(regex, + attribute->data.as_string, + cfl_sds_len(attribute->data.as_string), + &match_list); + + if (match_count <= 0) { + return FLB_FALSE; + } + + result = flb_regex_parse(regex, + &match_list, + attribute_match_cb, + (void *) span); + if (result == -1) { + return FLB_FALSE; + } + + return FLB_TRUE; +} + +static int context_contains_attribute(struct ctrace *traces_context, + char *name) +{ + struct cfl_list *iterator; + struct ctrace_span *span; + + cfl_list_foreach(iterator, &traces_context->span_list) { + span = cfl_list_entry(iterator, + struct ctrace_span, _head_global); + + if (span_contains_attribute(span, name) == FLB_TRUE) { + return FLB_TRUE; + } + } + + return FLB_FALSE; +} + +static int hex_encode(unsigned char *input_buffer, + size_t input_length, + cfl_sds_t *output_buffer) +{ + const char hex[] = "0123456789abcdef"; + cfl_sds_t result; + size_t index; + + if (cfl_sds_alloc(*output_buffer) <= (input_length * 2)) { + result = cfl_sds_increase(*output_buffer, + (input_length * 2) - + cfl_sds_alloc(*output_buffer)); + + if (result == NULL) { + return FLB_FALSE; + } + + *output_buffer = result; + } + + for (index = 0; index < input_length; index++) { + (*output_buffer)[index * 2 + 0] = hex[(input_buffer[index] >> 4) & 0xF]; + (*output_buffer)[index * 2 + 1] = hex[(input_buffer[index] >> 0) & 0xF]; + } + + cfl_sds_set_len(*output_buffer, input_length * 2); + + (*output_buffer)[index * 2] = '\0'; + + return FLB_TRUE; +} + +static int hash_transformer(void *context, struct cfl_variant *value) +{ + unsigned char digest_buffer[32]; + struct cfl_variant *converted_value; + cfl_sds_t encoded_hash; + int result; + + if (value == NULL) { + return FLB_FALSE; + } + + result = cfl_variant_convert(value, + &converted_value, + CFL_VARIANT_STRING); + + if (result != FLB_TRUE) { + return FLB_FALSE; + } + + if (cfl_sds_len(converted_value->data.as_string) == 0) { + cfl_variant_destroy(converted_value); + return FLB_TRUE; + } + + result = flb_hash_simple(FLB_HASH_SHA256, + (unsigned char *) converted_value->data.as_string, + cfl_sds_len(converted_value->data.as_string), + digest_buffer, + sizeof(digest_buffer)); + + if (result != FLB_CRYPTO_SUCCESS) { + cfl_variant_destroy(converted_value); + return FLB_FALSE; + } + + result = hex_encode(digest_buffer, + sizeof(digest_buffer), + &converted_value->data.as_string); + + if (result != FLB_TRUE) { + cfl_variant_destroy(converted_value); + return FLB_FALSE; + } + + encoded_hash = cfl_sds_create(converted_value->data.as_string); + if (encoded_hash == NULL) { + cfl_variant_destroy(converted_value); + return FLB_FALSE; + } + cfl_variant_destroy(converted_value); + + + if (value->type == CFL_VARIANT_STRING || + value->type == CFL_VARIANT_BYTES) { + cfl_sds_destroy(value->data.as_string); + } + else if (value->type == CFL_VARIANT_ARRAY) { + cfl_array_destroy(value->data.as_array); + } + else if (value->type == CFL_VARIANT_KVLIST) { + cfl_kvlist_destroy(value->data.as_kvlist); + } + + value->type = CFL_VARIANT_STRING; + value->data.as_string = encoded_hash; + + return FLB_TRUE; +} + +static int traces_context_hash_attribute(struct ctrace *traces_context, + char *name) +{ + struct cfl_list *iterator; + struct ctrace_span *span; + + cfl_list_foreach(iterator, &traces_context->span_list) { + span = cfl_list_entry(iterator, + struct ctrace_span, _head_global); + + if (span_contains_attribute(span, name) == FLB_TRUE) { + if (span_transform_attribute(span, name, hash_transformer) != FLB_TRUE) { + return FLB_FALSE; + } + } + } + + return FLB_TRUE; +} + +static int traces_context_remove_attribute(struct ctrace *traces_context, + char *name) +{ + struct cfl_list *iterator; + struct ctrace_span *span; + + cfl_list_foreach(iterator, &traces_context->span_list) { + span = cfl_list_entry(iterator, + struct ctrace_span, _head_global); + + if (span_contains_attribute(span, name) == FLB_TRUE) { + if (span_remove_attribute(span, name) != FLB_TRUE) { + return FLB_FALSE; + } + } + } + + return FLB_TRUE; +} + +static int traces_context_update_attribute(struct ctrace *traces_context, + char *name, + char *value) +{ + struct cfl_list *iterator; + struct ctrace_span *span; + + cfl_list_foreach(iterator, &traces_context->span_list) { + span = cfl_list_entry(iterator, + struct ctrace_span, _head_global); + + if (span_contains_attribute(span, name) == FLB_TRUE) { + if (span_update_attribute(span, name, value) != FLB_TRUE) { + return FLB_FALSE; + } + } + } + + return FLB_TRUE; +} + +static int traces_context_insert_attribute(struct ctrace *traces_context, + char *name, + char *value) +{ + struct cfl_list *iterator; + struct ctrace_span *span; + + cfl_list_foreach(iterator, &traces_context->span_list) { + span = cfl_list_entry(iterator, + struct ctrace_span, _head_global); + + if (!span_contains_attribute(span, name) == FLB_TRUE) { + if (span_insert_attribute(span, name, value) != FLB_TRUE) { + return FLB_FALSE; + } + } + } + + return FLB_TRUE; +} + +static int traces_context_upsert_attribute(struct ctrace *traces_context, + char *name, + char *value) +{ + struct cfl_list *iterator; + struct ctrace_span *span; + + cfl_list_foreach(iterator, &traces_context->span_list) { + span = cfl_list_entry(iterator, + struct ctrace_span, _head_global); + + if (span_contains_attribute(span, name) == FLB_TRUE) { + if (span_update_attribute(span, name, value) != FLB_TRUE) { + return FLB_FALSE; + } + } + else { + if (span_insert_attribute(span, name, value) != FLB_TRUE) { + return FLB_FALSE; + } + } + } + + return FLB_TRUE; +} + +static int traces_context_rename_attributes(struct ctrace *traces_context, + char *name, + char *value) +{ + int ret; + int renamed = 0; + struct cfl_list *iterator; + struct ctrace_span *span; + + cfl_list_foreach(iterator, &traces_context->span_list) { + span = cfl_list_entry(iterator, + struct ctrace_span, _head_global); + + if (span_contains_attribute(span, name) == FLB_TRUE) { + ret = span_rename_attribute(span, name, value); + if (ret == FLB_FALSE) { + return FLB_FALSE; + } + renamed++; + } + } + + if (renamed) { + return FLB_TRUE; + } + + return FLB_FALSE; +} + +static int traces_context_convert_attribute(struct ctrace *traces_context, + char *key, int new_type) +{ + struct cfl_list *iterator; + struct ctrace_span *span; + + cfl_list_foreach(iterator, &traces_context->span_list) { + span = cfl_list_entry(iterator, + struct ctrace_span, _head_global); + + if (span_contains_attribute(span, key) == FLB_TRUE) { + if (span_convert_attribute(span, key, new_type) != FLB_TRUE) { + return FLB_FALSE; + } + } + } + + return FLB_TRUE; +} + +static int traces_context_extract_attribute(struct ctrace *traces_context, + cfl_sds_t key, + struct flb_regex *regex) +{ + struct cfl_list *iterator; + struct ctrace_span *span; + + cfl_list_foreach(iterator, &traces_context->span_list) { + span = cfl_list_entry(iterator, + struct ctrace_span, _head_global); + + if (span_contains_attribute(span, key) == FLB_TRUE) { + if (span_extract_attributes(span, key, regex) != FLB_TRUE) { + return FLB_FALSE; + } + } + } + + return FLB_TRUE; +} + +int traces_update_attributes(struct ctrace *traces_context, struct cfl_list *attributes) +{ + struct cfl_list *iterator; + int result; + struct cfl_kv *pair; + + cfl_list_foreach(iterator, attributes) { + pair = cfl_list_entry(iterator, struct cfl_kv, _head); + + result = traces_context_update_attribute(traces_context, + pair->key, + pair->val); + + if (result == FLB_FALSE) { + return FLB_PROCESSOR_FAILURE; + } + } + + return FLB_PROCESSOR_SUCCESS; +} + + + +static int traces_convert_attributes(struct content_modifier_ctx *ctx, struct ctrace *traces_context, + cfl_sds_t key, int converted_type) +{ + int ret; + + ret = traces_context_convert_attribute(traces_context, key, converted_type); + if (ret == FLB_FALSE) { + return FLB_PROCESSOR_FAILURE; + } + + return FLB_PROCESSOR_SUCCESS; +} + +static int traces_extract_attributes(struct content_modifier_ctx *ctx, struct ctrace *traces_context, + cfl_sds_t key, struct flb_regex *regex) +{ + int ret; + + ret = traces_context_extract_attribute(traces_context, key, regex); + if (ret == FLB_FALSE) { + return FLB_PROCESSOR_FAILURE; + } + + return FLB_PROCESSOR_SUCCESS; +} + +static int traces_insert_attributes(struct content_modifier_ctx *ctx, struct ctrace *traces_context, + cfl_sds_t key, cfl_sds_t value) +{ + int ret; + + ret = traces_context_insert_attribute(traces_context, key, value); + if (ret == FLB_FALSE) { + return FLB_PROCESSOR_FAILURE; + } + + return FLB_PROCESSOR_SUCCESS; +} + +static int traces_rename_attributes(struct content_modifier_ctx *ctx, struct ctrace *traces_context, + cfl_sds_t key, cfl_sds_t new_name) +{ + int ret; + + ret = traces_context_rename_attributes(traces_context, key, new_name); + //ret = traces_context_rename_attribute( + if (ret == FLB_FALSE) { + return FLB_PROCESSOR_FAILURE; + } + + return FLB_PROCESSOR_SUCCESS; +} +static int traces_upsert_attributes(struct content_modifier_ctx *ctx, struct ctrace *traces_context, + cfl_sds_t key, cfl_sds_t value) +{ + int ret; + + ret = traces_context_upsert_attribute(traces_context, key, value); + if (ret == FLB_FALSE) { + return FLB_PROCESSOR_FAILURE; + } + + return FLB_PROCESSOR_SUCCESS; +} + +static int traces_delete_attributes(struct content_modifier_ctx *ctx, struct ctrace *traces_context, cfl_sds_t key) +{ + int ret; + + ret = context_contains_attribute(traces_context, key); + if (ret == FLB_TRUE) { + ret = traces_context_remove_attribute(traces_context, key); + if (ret == FLB_FALSE) { + return FLB_PROCESSOR_FAILURE; + } + } + + return FLB_PROCESSOR_SUCCESS; +} + + +static int traces_hash_attributes(struct content_modifier_ctx *ctx, struct ctrace *traces_context, cfl_sds_t key) +{ + int ret; + + ret = context_contains_attribute(traces_context, key); + if (ret == FLB_TRUE) { + ret = traces_context_hash_attribute(traces_context, key); + if (ret == FLB_FALSE) { + return FLB_PROCESSOR_FAILURE; + } + } + + return FLB_PROCESSOR_SUCCESS; +} + +int cm_traces_process(struct flb_processor_instance *ins, + struct content_modifier_ctx *ctx, + struct ctrace *traces_context, + const char *tag, int tag_len) +{ + int ret = -1; + + /* process the action */ + if (ctx->action_type == CM_ACTION_INSERT) { + ret = traces_insert_attributes(ctx, traces_context, ctx->key, ctx->value); + } + else if (ctx->action_type == CM_ACTION_UPSERT) { + ret = traces_upsert_attributes(ctx, traces_context, ctx->key, ctx->value); + } + else if (ctx->action_type == CM_ACTION_DELETE) { + ret = traces_delete_attributes(ctx, traces_context, ctx->key); + } + else if (ctx->action_type == CM_ACTION_RENAME) { + ret = traces_rename_attributes(ctx, traces_context, ctx->key, ctx->value); + } + else if (ctx->action_type == CM_ACTION_HASH) { + ret = traces_hash_attributes(ctx, traces_context, ctx->key); + } + else if (ctx->action_type == CM_ACTION_EXTRACT) { + ret = traces_extract_attributes(ctx, traces_context, ctx->key, ctx->regex); + } + else if (ctx->action_type == CM_ACTION_CONVERT) { + ret = traces_convert_attributes(ctx, traces_context, ctx->key, ctx->converted_type); + } + + if (ret != 0) { + return FLB_PROCESSOR_FAILURE; + } + + return FLB_PROCESSOR_SUCCESS; +} diff --git a/plugins/processor_attributes/variant_utils.h b/plugins/processor_content_modifier/cm_utils.h similarity index 98% rename from plugins/processor_attributes/variant_utils.h rename to plugins/processor_content_modifier/cm_utils.h index 7ba3762737a..7116172603c 100644 --- a/plugins/processor_attributes/variant_utils.h +++ b/plugins/processor_content_modifier/cm_utils.h @@ -623,4 +623,11 @@ static inline int unpack_cfl_variant(mpack_reader_t *reader, return result; } +cfl_sds_t cfl_variant_convert_to_json(struct cfl_variant *value); +int cfl_variant_convert(struct cfl_variant *input_value, + struct cfl_variant **output_value, + int output_type); + + + #endif diff --git a/src/flb_lib.c b/src/flb_lib.c index 77d2397b401..b77f29b7a1d 100644 --- a/src/flb_lib.c +++ b/src/flb_lib.c @@ -697,7 +697,7 @@ int static do_start(flb_ctx_t *ctx) break; } else if (val == FLB_ENGINE_FAILED) { - flb_error("[lib] backend failed"); + flb_debug("[lib] backend failed"); #if defined(FLB_SYSTEM_MACOS) pthread_cancel(tid); #endif diff --git a/src/flb_log_event_decoder.c b/src/flb_log_event_decoder.c index 4a49a6d19dd..2f1a5b845e4 100644 --- a/src/flb_log_event_decoder.c +++ b/src/flb_log_event_decoder.c @@ -294,10 +294,7 @@ int flb_log_event_decoder_next(struct flb_log_event_decoder *context, return context->last_result; } - memset(event, 0, sizeof(struct flb_log_event)); - previous_offset = context->offset; - result = msgpack_unpack_next(&context->unpacked_event, context->buffer, context->length, @@ -314,8 +311,8 @@ int flb_log_event_decoder_next(struct flb_log_event_decoder *context, context->previous_offset = previous_offset; context->last_result = flb_event_decoder_decode_object(context, - event, - &context->unpacked_event.data); + event, + &context->unpacked_event.data); return context->last_result; } diff --git a/src/flb_mp.c b/src/flb_mp.c index e4ce9dcb60d..5fbc2fbca8a 100644 --- a/src/flb_mp.c +++ b/src/flb_mp.c @@ -22,10 +22,15 @@ #include #include #include +#include + #include #include #include +#include +#include + #include #include @@ -1014,3 +1019,206 @@ int flb_mp_cfl_to_msgpack(struct cfl_object *obj, char **out_buf, size_t *out_si return 0; } + +struct flb_mp_chunk_record *flb_mp_chunk_record_create(struct flb_mp_chunk_cobj *chunk_cobj) +{ + struct flb_mp_chunk_record *record; + + record = flb_calloc(1, sizeof(struct flb_mp_chunk_record)); + if (!record) { + flb_errno(); + return NULL; + } + record->modified = FLB_FALSE; + + return record; +} + +struct flb_mp_chunk_cobj *flb_mp_chunk_cobj_create(struct flb_log_event_encoder *log_encoder, struct flb_log_event_decoder *log_decoder) +{ + struct flb_mp_chunk_cobj *chunk_cobj; + + if (!log_encoder || !log_decoder) { + return NULL; + } + + chunk_cobj = flb_calloc(1, sizeof(struct flb_mp_chunk_cobj)); + if (!chunk_cobj) { + flb_errno(); + return NULL; + } + cfl_list_init(&chunk_cobj->records); + chunk_cobj->record_pos = NULL; + chunk_cobj->log_encoder = log_encoder; + chunk_cobj->log_decoder = log_decoder; + + return chunk_cobj; +} + +int flb_mp_chunk_cobj_encode(struct flb_mp_chunk_cobj *chunk_cobj, char **out_buf, size_t *out_size) +{ + int ret; + char *mp_buf; + size_t mp_size; + struct cfl_list *head; + struct flb_mp_chunk_record *record; + + if (!chunk_cobj) { + return -1; + } + + /* Iterate all records */ + cfl_list_foreach(head, &chunk_cobj->records) { + record = cfl_list_entry(head, struct flb_mp_chunk_record, _head); + if (record->modified == FLB_TRUE) { + //FIXME WRITE RAW BUFFER + continue; + } + + ret = flb_log_event_encoder_begin_record(chunk_cobj->log_encoder); + if (ret == -1) { + return -1; + } + + ret = flb_log_event_encoder_set_timestamp(chunk_cobj->log_encoder, &record->event.timestamp); + if (ret == -1) { + return -1; + } + + if (record->cobj_metadata) { + ret = flb_mp_cfl_to_msgpack(record->cobj_metadata, &mp_buf, &mp_size); + if (ret == -1) { + return -1; + } + + ret = flb_log_event_encoder_set_metadata_from_raw_msgpack(chunk_cobj->log_encoder, mp_buf, mp_size); + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + flb_free(mp_buf); + return -1; + } + flb_free(mp_buf); + } + + ret = flb_mp_cfl_to_msgpack(record->cobj_record, &mp_buf, &mp_size); + if (ret == -1) { + return -1; + } + + ret = flb_log_event_encoder_set_body_from_raw_msgpack(chunk_cobj->log_encoder, mp_buf, mp_size); + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + flb_free(mp_buf); + return -1; + } + flb_free(mp_buf); + + ret = flb_log_event_encoder_commit_record(chunk_cobj->log_encoder); + if (ret == -1) { + return -1; + } + } + + /* set new output buffer */ + *out_buf = chunk_cobj->log_encoder->output_buffer; + *out_size = chunk_cobj->log_encoder->output_length; + + flb_log_event_encoder_claim_internal_buffer_ownership(chunk_cobj->log_encoder); + return 0; +} + +int flb_mp_chunk_cobj_destroy(struct flb_mp_chunk_cobj *chunk_cobj) +{ + struct cfl_list *tmp; + struct cfl_list *head; + struct flb_mp_chunk_record *record; + + if (!chunk_cobj) { + return -1; + } + + cfl_list_foreach_safe(head, tmp, &chunk_cobj->records) { + record = cfl_list_entry(head, struct flb_mp_chunk_record, _head); + if (record->cobj_metadata) { + cfl_object_destroy(record->cobj_metadata); + } + if (record->cobj_record) { + cfl_object_destroy(record->cobj_record); + } + cfl_list_del(&record->_head); + flb_free(record); + } + + flb_free(chunk_cobj); + return 0; +} + +int flb_mp_chunk_cobj_record_next(struct flb_mp_chunk_cobj *chunk_cobj, + struct flb_mp_chunk_record **out_record) +{ + int ret; + size_t bytes; + struct flb_mp_chunk_record *record = NULL; + + *out_record = NULL; + + bytes = chunk_cobj->log_decoder->length - chunk_cobj->log_decoder->offset; + + /* + * if there are remaining decoder bytes, keep iterating msgpack and populate + * the cobj list. Otherwise it means all the content is ready as a chunk_cobj_record. + */ + if (bytes > 0) { + record = flb_mp_chunk_record_create(chunk_cobj); + if (!record) { + return FLB_MP_CHUNK_RECORD_ERROR; + } + + ret = flb_log_event_decoder_next(chunk_cobj->log_decoder, &record->event); + if (ret != FLB_EVENT_DECODER_SUCCESS) { + flb_free(record); + return -1; + } + + record->cobj_metadata = flb_mp_object_to_cfl(record->event.metadata); + if (!record->cobj_metadata) { + flb_free(record); + return -1; + } + + record->cobj_record = flb_mp_object_to_cfl(record->event.body); + if (!record->cobj_record) { + cfl_object_destroy(record->cobj_metadata); + flb_free(record); + return -1; + } + + cfl_list_add(&record->_head, &chunk_cobj->records); + } + else if (chunk_cobj->record_pos == NULL) { + /* + * bytes is zero, if record_next is not set it means we need to start iterating from + * this list, just assign the first entry from it. + */ + record = cfl_list_entry_first(&chunk_cobj->records, struct flb_mp_chunk_record, _head); + } + else { + /* check if we are the last in the list */ + record = cfl_list_entry_first(&chunk_cobj->records, struct flb_mp_chunk_record, _head); + if (chunk_cobj->record_pos == record) { + return FLB_MP_CHUNK_RECORD_EOF; + } + + /* get the next entry from the list */ + record = cfl_list_entry_next(&record->_head, struct flb_mp_chunk_record, + _head, &chunk_cobj->records); + } + + chunk_cobj->record_pos = record; + *out_record = chunk_cobj->record_pos; + + return FLB_MP_CHUNK_RECORD_OK; +} + +int flb_mp_chunk_cobj_record_modified() +{ + return 0; +} diff --git a/src/flb_processor.c b/src/flb_processor.c index 6a7ece8e546..a7a6e2b20d9 100644 --- a/src/flb_processor.c +++ b/src/flb_processor.c @@ -26,6 +26,7 @@ #include #include #include +#include #include #include @@ -239,7 +240,7 @@ struct flb_processor_unit *flb_processor_unit_create(struct flb_processor *proc, pu->unit_type = FLB_PROCESSOR_UNIT_NATIVE; /* create an instance of the processor */ - processor_instance = flb_processor_instance_create(config, unit_name, NULL); + processor_instance = flb_processor_instance_create(config, pu->event_type, unit_name, NULL); if (processor_instance == NULL) { flb_error("[processor] error creating native processor instance %s", pu->name); @@ -288,7 +289,7 @@ int flb_processor_unit_set_property(struct flb_processor_unit *pu, const char *k for (i = 0; i < v->data.as_array->entry_count; i++) { val = v->data.as_array->entries[i]; ret = flb_filter_set_property(pu->ctx, k, val->data.as_string); - + if (ret == -1) { return ret; } @@ -335,7 +336,7 @@ int flb_processor_unit_init(struct flb_processor_unit *pu) if (pu->unit_type == FLB_PROCESSOR_UNIT_FILTER) { ret = flb_filter_init(proc->config, pu->ctx); - + if (ret == -1) { flb_error("[processor] error initializing unit filter %s", pu->name); return -1; @@ -371,7 +372,7 @@ int flb_processor_init(struct flb_processor *proc) mk_list_foreach(head, &proc->logs) { pu = mk_list_entry(head, struct flb_processor_unit, _head); ret = flb_processor_unit_init(pu); - + if (ret == -1) { return -1; } @@ -381,7 +382,7 @@ int flb_processor_init(struct flb_processor *proc) mk_list_foreach(head, &proc->metrics) { pu = mk_list_entry(head, struct flb_processor_unit, _head); ret = flb_processor_unit_init(pu); - + if (ret == -1) { return -1; } @@ -391,7 +392,7 @@ int flb_processor_init(struct flb_processor *proc) mk_list_foreach(head, &proc->traces) { pu = mk_list_entry(head, struct flb_processor_unit, _head); ret = flb_processor_unit_init(pu); - + if (ret == -1) { return -1; } @@ -413,6 +414,8 @@ int flb_processor_is_active(struct flb_processor *proc) return FLB_FALSE; } +#include + /* * This function will run all the processor units for the given tag and data, note * that depending of the 'type', 'data' can reference a msgpack for logs, a CMetrics @@ -426,17 +429,18 @@ int flb_processor_run(struct flb_processor *proc, void **out_buf, size_t *out_size) { int ret; + int finalize; void *cur_buf; size_t cur_size; void *tmp_buf; size_t tmp_size; - int decoder_result; struct mk_list *head; struct mk_list *list = NULL; - struct flb_log_event log_event; struct flb_processor_unit *pu; + struct flb_processor_unit *pu_next; struct flb_filter_instance *f_ins; struct flb_processor_instance *p_ins; + struct flb_mp_chunk_cobj *chunk_cobj = NULL; if (type == FLB_PROCESSOR_LOGS) { list = &proc->logs; @@ -500,7 +504,7 @@ int flb_processor_run(struct flb_processor *proc, * */ if (ret == FLB_FILTER_MODIFIED) { - + /* release intermediate buffer */ if (cur_buf != data) { flb_free(cur_buf); @@ -539,82 +543,100 @@ int flb_processor_run(struct flb_processor *proc, /* run the process callback */ if (type == FLB_PROCESSOR_LOGS) { if (p_ins->p->cb_process_logs != NULL) { - flb_log_event_encoder_reset(p_ins->log_encoder); - - decoder_result = flb_log_event_decoder_init( - p_ins->log_decoder, cur_buf, cur_size); - - if (decoder_result != FLB_EVENT_DECODER_SUCCESS) { - flb_log_event_decoder_reset(p_ins->log_decoder, NULL, 0); - if (cur_buf != data) { - flb_free(cur_buf); + /* if no previous chunkj_cobj exist, create instance. Note that this context will last + * until no more processors exists or the next one is a "filter" type processor. + */ + if (!chunk_cobj) { + flb_log_event_decoder_reset(p_ins->log_decoder, cur_buf, cur_size); + + /* create the context */ + chunk_cobj = flb_mp_chunk_cobj_create(p_ins->log_encoder, p_ins->log_decoder); + if (chunk_cobj == NULL) { + flb_log_event_decoder_reset(p_ins->log_decoder, NULL, 0); + if (cur_buf != data) { + flb_free(cur_buf); + } + + release_lock(&pu->lock, + FLB_PROCESSOR_LOCK_RETRY_LIMIT, + FLB_PROCESSOR_LOCK_RETRY_DELAY); + + return -1; } - - release_lock(&pu->lock, - FLB_PROCESSOR_LOCK_RETRY_LIMIT, - FLB_PROCESSOR_LOCK_RETRY_DELAY); - - return -1; } - ret = FLB_PROCESSOR_SUCCESS; - - do { - decoder_result = flb_log_event_decoder_next( - p_ins->log_decoder, - &log_event); + /* Invoke processor plugin callback */ + ret = p_ins->p->cb_process_logs(p_ins, chunk_cobj, tag, tag_len); + if (ret != FLB_PROCESSOR_SUCCESS) { + flb_warn("[processor] failed to process chunk"); + } + chunk_cobj->record_pos = NULL; + finalize = FLB_FALSE; - if (decoder_result == FLB_EVENT_DECODER_SUCCESS) { - ret = p_ins->p->cb_process_logs(p_ins, - p_ins->log_encoder, - &log_event, - tag, tag_len); + /* is this processing_unit the last one from the list ? */ + if (head->next == list ) { + finalize = FLB_TRUE; + } + else { + pu_next = mk_list_entry(head->next, struct flb_processor_unit, _head); + if (pu_next->unit_type == FLB_PROCESSOR_UNIT_FILTER) { + /* + * The next iterationm requires a raw msgpack buffer, let's do the + * encoding. + */ + finalize = FLB_TRUE; } } - while (decoder_result == FLB_EVENT_DECODER_SUCCESS && - ret == FLB_PROCESSOR_SUCCESS); - flb_log_event_decoder_reset(p_ins->log_decoder, NULL, 0); + if (finalize == FLB_TRUE) { + if (cfl_list_size(&chunk_cobj->records) == 0) { + flb_log_event_encoder_reset(p_ins->log_encoder); + flb_mp_chunk_cobj_destroy(chunk_cobj); - if (cur_buf != data) { - flb_free(cur_buf); - } + *out_buf = NULL; + *out_size = 0; - if (ret != FLB_PROCESSOR_SUCCESS) { - flb_log_event_encoder_reset(p_ins->log_encoder); - - release_lock(&pu->lock, - FLB_PROCESSOR_LOCK_RETRY_LIMIT, - FLB_PROCESSOR_LOCK_RETRY_DELAY); + release_lock(&pu->lock, + FLB_PROCESSOR_LOCK_RETRY_LIMIT, + FLB_PROCESSOR_LOCK_RETRY_DELAY); + return 0; + } - return -1; - } + /* encode chunk_cobj as msgpack */ + ret = flb_mp_chunk_cobj_encode(chunk_cobj, (char **) &tmp_buf, &tmp_size); + if (ret != 0) { + printf("failure\n"); + flb_log_event_decoder_reset(p_ins->log_decoder, NULL, 0); - if (p_ins->log_encoder->output_length == 0) { - flb_log_event_encoder_reset(p_ins->log_encoder); + if (cur_buf != data) { + flb_free(cur_buf); + } - *out_buf = NULL; - *out_size = 0; + release_lock(&pu->lock, + FLB_PROCESSOR_LOCK_RETRY_LIMIT, + FLB_PROCESSOR_LOCK_RETRY_DELAY); - release_lock(&pu->lock, - FLB_PROCESSOR_LOCK_RETRY_LIMIT, - FLB_PROCESSOR_LOCK_RETRY_DELAY); + return -1; + } - return 0; - } + if (cur_buf != data) { + flb_free(cur_buf); + } - flb_log_event_encoder_claim_internal_buffer_ownership(p_ins->log_encoder); + cur_buf = tmp_buf; + cur_size = tmp_size; - /* set new buffer */ - cur_buf = p_ins->log_encoder->output_buffer; - cur_size = p_ins->log_encoder->output_length; - flb_log_event_encoder_reset(p_ins->log_encoder); + flb_log_event_decoder_reset(p_ins->log_decoder, NULL, 0); + flb_log_event_encoder_claim_internal_buffer_ownership(p_ins->log_encoder); + flb_mp_chunk_cobj_destroy(chunk_cobj); + chunk_cobj = NULL; + } } } else if (type == FLB_PROCESSOR_METRICS) { - + if (p_ins->p->cb_process_metrics != NULL) { ret = p_ins->p->cb_process_metrics(p_ins, (struct cmt *) cur_buf, @@ -631,7 +653,7 @@ int flb_processor_run(struct flb_processor *proc, } } else if (type == FLB_PROCESSOR_TRACES) { - + if (p_ins->p->cb_process_traces != NULL) { ret = p_ins->p->cb_process_traces(p_ins, (struct ctrace *) cur_buf, @@ -714,7 +736,7 @@ static int load_from_config_format_group(struct flb_processor *proc, int type, s for (i = 0; i < array->entry_count; i++) { /* every entry in the array must be a map */ tmp = array->entries[i]; - + if (tmp->type != CFL_VARIANT_KVLIST) { return -1; } @@ -723,7 +745,7 @@ static int load_from_config_format_group(struct flb_processor *proc, int type, s /* get the processor name, this is a mandatory config field */ tmp = cfl_kvlist_fetch(kvlist, "name"); - + if (!tmp) { flb_error("processor configuration don't have a 'name' defined"); return -1; @@ -732,7 +754,7 @@ static int load_from_config_format_group(struct flb_processor *proc, int type, s /* create the processor unit and load all the properties */ name = tmp->data.as_string; pu = flb_processor_unit_create(proc, type, name); - + if (!pu) { flb_error("cannot create '%s' processor unit", name); return -1; @@ -741,7 +763,7 @@ static int load_from_config_format_group(struct flb_processor *proc, int type, s /* iterate list of properties and set each one (skip name) */ cfl_list_foreach(head, &kvlist->list) { pair = cfl_list_entry(head, struct cfl_kvpair, _head); - + if (strcmp(pair->key, "name") == 0) { continue; } @@ -750,10 +772,10 @@ static int load_from_config_format_group(struct flb_processor *proc, int type, s * we must release the pre-allocated '*' match at first. */ if (pu->unit_type == FLB_PROCESSOR_UNIT_FILTER) { - + if (strcmp(pair->key, "match") == 0) { f_ins = (struct flb_filter_instance *)pu->ctx; - + if (f_ins->match != NULL) { flb_sds_destroy(f_ins->match); f_ins->match = NULL; @@ -762,7 +784,7 @@ static int load_from_config_format_group(struct flb_processor *proc, int type, s } ret = flb_processor_unit_set_property(pu, pair->key, pair->val); - + if (ret == -1) { flb_error("cannot set property '%s' for processor '%s'", pair->key, name); return -1; @@ -782,10 +804,10 @@ int flb_processors_load_from_config_format_group(struct flb_processor *proc, str /* logs */ val = cfl_kvlist_fetch(g->properties, "logs"); - + if (val) { ret = load_from_config_format_group(proc, FLB_PROCESSOR_LOGS, val); - + if (ret == -1) { flb_error("failed to load 'logs' processors"); return -1; @@ -794,10 +816,10 @@ int flb_processors_load_from_config_format_group(struct flb_processor *proc, str /* metrics */ val = cfl_kvlist_fetch(g->properties, "metrics"); - + if (val) { ret = load_from_config_format_group(proc, FLB_PROCESSOR_METRICS, val); - + if (ret == -1) { flb_error("failed to load 'metrics' processors"); return -1; @@ -808,7 +830,7 @@ int flb_processors_load_from_config_format_group(struct flb_processor *proc, str val = cfl_kvlist_fetch(g->properties, "traces"); if (val) { ret = load_from_config_format_group(proc, FLB_PROCESSOR_TRACES, val); - + if (ret == -1) { flb_error("failed to load 'traces' processors"); return -1; @@ -848,7 +870,7 @@ int flb_processor_instance_set_property(struct flb_processor_instance *ins, len = strlen(k); tmp = flb_env_var_translate(ins->config->env, v); - + if (!tmp) { return -1; } @@ -859,7 +881,7 @@ int flb_processor_instance_set_property(struct flb_processor_instance *ins, else if (prop_key_check("log_level", k, len) == 0 && tmp) { ret = flb_log_get_level_str(tmp); flb_sds_destroy(tmp); - + if (ret == -1) { return -1; } @@ -871,9 +893,9 @@ int flb_processor_instance_set_property(struct flb_processor_instance *ins, * map it directly to avoid an extra memory allocation. */ kv = flb_kv_item_create(&ins->properties, (char *) k, NULL); - + if (!kv) { - + if (tmp) { flb_sds_destroy(tmp); } @@ -892,9 +914,9 @@ const char *flb_processor_instance_get_property( return flb_kv_get_key_value(key, &ins->properties); } -struct flb_processor_instance *flb_processor_instance_create( - struct flb_config *config, - const char *name, void *data) +struct flb_processor_instance *flb_processor_instance_create(struct flb_config *config, + int event_type, + const char *name, void *data) { struct flb_processor_instance *instance; struct flb_processor_plugin *plugin; @@ -907,7 +929,7 @@ struct flb_processor_instance *flb_processor_instance_create( mk_list_foreach(head, &config->processor_plugins) { plugin = mk_list_entry(head, struct flb_processor_plugin, _head); - + if (strcasecmp(plugin->name, name) == 0) { break; } @@ -919,12 +941,10 @@ struct flb_processor_instance *flb_processor_instance_create( } instance = flb_calloc(1, sizeof(struct flb_filter_instance)); - if (!instance) { flb_errno(); return NULL; } - instance->config = config; /* Get an ID */ @@ -934,7 +954,8 @@ struct flb_processor_instance *flb_processor_instance_create( snprintf(instance->name, sizeof(instance->name) - 1, "%s.%i", plugin->name, id); - instance->id = id; + instance->id = id; + instance->event_type = event_type; instance->alias = NULL; instance->p = plugin; instance->data = data; @@ -942,40 +963,31 @@ struct flb_processor_instance *flb_processor_instance_create( mk_list_init(&instance->properties); - instance->log_encoder = flb_log_event_encoder_create( - FLB_LOG_EVENT_FORMAT_DEFAULT); - + instance->log_encoder = flb_log_event_encoder_create(FLB_LOG_EVENT_FORMAT_DEFAULT); if (instance->log_encoder == NULL) { flb_plg_error(instance, "log event encoder initialization error"); - flb_processor_instance_destroy(instance); - instance = NULL; } - instance->log_decoder = flb_log_event_decoder_create(NULL, 0); + instance->log_decoder = flb_log_event_decoder_create(NULL, 0); if (instance->log_decoder == NULL) { flb_plg_error(instance, "log event decoder initialization error"); - flb_processor_instance_destroy(instance); - instance = NULL; } return instance; } -void flb_processor_instance_exit( - struct flb_processor_instance *ins, - struct flb_config *config) +void flb_processor_instance_exit(struct flb_processor_instance *ins, struct flb_config *config) { struct flb_processor_plugin *plugin; plugin = ins->p; - if (plugin->cb_exit != NULL && - ins->context != NULL) { + if (plugin->cb_exit != NULL && ins->context != NULL) { plugin->cb_exit(ins); } } @@ -1003,7 +1015,7 @@ int flb_processor_instance_check_properties( * instance in question. */ config_map = flb_config_map_create(config, p->config_map); - + if (!config_map) { flb_error("[native processor] error loading config map for '%s' plugin", p->name); @@ -1015,9 +1027,9 @@ int flb_processor_instance_check_properties( ret = flb_config_map_properties_check(ins->p->name, &ins->properties, ins->config_map); - + if (ret == -1) { - + if (config->program_name) { flb_helper("try the command: %s -F %s -h\n", config->program_name, ins->p->name); @@ -1084,15 +1096,12 @@ int flb_processor_instance_init( return 0; } -void flb_processor_instance_set_context( - struct flb_processor_instance *ins, - void *context) +void flb_processor_instance_set_context(struct flb_processor_instance *ins, void *context) { ins->context = context; } -void flb_processor_instance_destroy( - struct flb_processor_instance *ins) +void flb_processor_instance_destroy(struct flb_processor_instance *ins) { if (ins == NULL) { return;