diff --git a/plugins/filter_kubernetes/CMakeLists.txt b/plugins/filter_kubernetes/CMakeLists.txt index d3ebac7ef02..771fb1bbb23 100644 --- a/plugins/filter_kubernetes/CMakeLists.txt +++ b/plugins/filter_kubernetes/CMakeLists.txt @@ -4,6 +4,7 @@ set(src kube_regex.c kube_property.c kubernetes.c + kubernetes_aws.c ) FLB_PLUGIN(filter_kubernetes "${src}" "") diff --git a/plugins/filter_kubernetes/kube_conf.c b/plugins/filter_kubernetes/kube_conf.c index d4bcff6380a..36e67c4c2f8 100644 --- a/plugins/filter_kubernetes/kube_conf.c +++ b/plugins/filter_kubernetes/kube_conf.c @@ -189,6 +189,15 @@ struct flb_kube *flb_kube_conf_create(struct flb_filter_instance *ins, flb_plg_info(ctx->ins, "https=%i host=%s port=%i", ctx->api_https, ctx->api_host, ctx->api_port); } + + ctx->aws_pod_service_hash_table = flb_hash_table_create_with_ttl(ctx->aws_pod_service_map_ttl, + FLB_HASH_TABLE_EVICT_OLDER, + FLB_HASH_TABLE_SIZE, + FLB_HASH_TABLE_SIZE); + if (!ctx->aws_pod_service_hash_table) { + flb_kube_conf_destroy(ctx); + return NULL; + } return ctx; } @@ -206,6 +215,10 @@ void flb_kube_conf_destroy(struct flb_kube *ctx) flb_hash_table_destroy(ctx->namespace_hash_table); } + if (ctx->aws_pod_service_hash_table) { + flb_hash_table_destroy(ctx->aws_pod_service_hash_table); + } + if (ctx->merge_log == FLB_TRUE) { flb_free(ctx->unesc_buf); } @@ -214,6 +227,9 @@ void flb_kube_conf_destroy(struct flb_kube *ctx) if (ctx->parser == NULL && ctx->regex) { flb_regex_destroy(ctx->regex); } + if (ctx->deploymentRegex) { + flb_regex_destroy(ctx->deploymentRegex); + } flb_free(ctx->api_host); flb_free(ctx->token); @@ -228,6 +244,18 @@ void flb_kube_conf_destroy(struct flb_kube *ctx) flb_upstream_destroy(ctx->kube_api_upstream); } + if (ctx->aws_pod_association_tls) { + flb_tls_destroy(ctx->aws_pod_association_tls); + } + + if (ctx->aws_pod_association_upstream) { + flb_upstream_destroy(ctx->aws_pod_association_upstream); + } + + if (ctx->platform) { + flb_free(ctx->platform); + } + #ifdef FLB_HAVE_TLS if (ctx->tls) { flb_tls_destroy(ctx->tls); diff --git a/plugins/filter_kubernetes/kube_conf.h b/plugins/filter_kubernetes/kube_conf.h index 3fad87a8c00..1c8cad038fb 100644 --- a/plugins/filter_kubernetes/kube_conf.h +++ b/plugins/filter_kubernetes/kube_conf.h @@ -65,8 +65,40 @@ #define FLB_KUBE_TAG_PREFIX "kube.var.log.containers." #endif +/* + * Maximum attribute length for Entity's KeyAttributes + * values + * https://docs.aws.amazon.com/applicationsignals/latest/APIReference/API_Service.html#:~:text=Maximum%20length%20of%201024. + */ +#define KEY_ATTRIBUTES_MAX_LEN 1024 +#define SERVICE_NAME_SOURCE_MAX_LEN 64 + +/* + * Configmap used for verifying whether if FluentBit is + * on EKS or native Kubernetes + */ +#define KUBE_SYSTEM_NAMESPACE "kube-system" +#define AWS_AUTH_CONFIG_MAP "aws-auth" + +/* + * Possible platform values for Kubernetes plugin + */ +#define NATIVE_KUBERNETES_PLATFORM "k8s" +#define EKS_PLATFORM "eks" + struct kube_meta; +struct service_attributes { + char name[KEY_ATTRIBUTES_MAX_LEN]; + int name_len; + char environment[KEY_ATTRIBUTES_MAX_LEN]; + int environment_len; + char name_source[SERVICE_NAME_SOURCE_MAX_LEN]; + int name_source_len; + int fields; + +}; + /* Filter context */ struct flb_kube { /* Configuration parameters */ @@ -124,6 +156,7 @@ struct flb_kube { /* Regex context to parse records */ struct flb_regex *regex; + struct flb_regex *deploymentRegex; struct flb_parser *parser; /* TLS CA certificate file */ @@ -165,6 +198,41 @@ struct flb_kube { int kube_meta_cache_ttl; int kube_meta_namespace_cache_ttl; + /* Configuration used for enabling pod to service name mapping*/ + int aws_use_pod_association; + char *aws_pod_association_host; + char *aws_pod_association_endpoint; + int aws_pod_association_port; + + /* + * TTL is used to check how long should the mapped entry + * remain in the hash table + */ + struct flb_hash_table *aws_pod_service_hash_table; + int aws_pod_service_map_ttl; + int aws_pod_service_map_refresh_interval; + flb_sds_t aws_pod_service_preload_cache_path; + struct flb_upstream *aws_pod_association_upstream; + /* + * This variable holds the Kubernetes platform type + * Current checks for EKS or Native Kuberentes + */ + char *platform; + /* + * This value is used for holding the platform config + * value. Platform will be overriden with this variable + * if it's set + */ + char *set_platform; + + //Agent TLS certs + struct flb_tls *aws_pod_association_tls; + char *aws_pod_association_host_server_ca_file; + char *aws_pod_association_host_client_cert_file; + char *aws_pod_association_host_client_key_file; + int aws_pod_association_host_tls_debug; + int aws_pod_association_host_tls_verify; + struct flb_tls *tls; struct flb_tls *kubelet_tls; diff --git a/plugins/filter_kubernetes/kube_meta.c b/plugins/filter_kubernetes/kube_meta.c index 83ba20167e3..f817725555f 100644 --- a/plugins/filter_kubernetes/kube_meta.c +++ b/plugins/filter_kubernetes/kube_meta.c @@ -27,6 +27,7 @@ #include #include #include +#include #include #include @@ -37,6 +38,8 @@ #include "kube_conf.h" #include "kube_meta.h" #include "kube_property.h" +#include "kubernetes_aws.h" +#include "fluent-bit/flb_ra_key.h" #define FLB_KUBE_META_CONTAINER_STATUSES_KEY "containerStatuses" #define FLB_KUBE_META_CONTAINER_STATUSES_KEY_LEN \ @@ -354,7 +357,8 @@ static int get_meta_file_info(struct flb_kube *ctx, const char *namespace, */ static int get_meta_info_from_request(struct flb_kube *ctx, const char *namespace, - const char *podname, + const char *resource_type, + const char *resource_name, char **buffer, size_t *size, int *root_type, char* uri, @@ -411,9 +415,9 @@ static int get_meta_info_from_request(struct flb_kube *ctx, } ret = flb_http_do(c, &b_sent); - flb_plg_debug(ctx->ins, "Request (ns=%s, pod=%s) http_do=%i, " + flb_plg_debug(ctx->ins, "Request (ns=%s, %s=%s) http_do=%i, " "HTTP Status: %i", - namespace, podname, ret, c->resp.status); + namespace, resource_type, resource_name, ret, c->resp.status); if (ret != 0 || c->resp.status != 200) { if (c->resp.payload_size > 0) { @@ -463,7 +467,7 @@ static int get_pods_from_kubelet(struct flb_kube *ctx, } flb_plg_debug(ctx->ins, "Send out request to Kubelet for pods information."); - packed = get_meta_info_from_request(ctx, namespace, podname, + packed = get_meta_info_from_request(ctx, namespace, FLB_KUBE_POD, podname, &buf, &size, &root_type, uri, ctx->use_kubelet); } @@ -479,6 +483,44 @@ static int get_pods_from_kubelet(struct flb_kube *ctx, return 0; } +/* Gather metadata from API Server */ +int get_api_server_configmap(struct flb_kube *ctx, + const char *namespace, const char *configmap, + char **out_buf, size_t *out_size) +{ + int ret; + int packed = -1; + int root_type; + char uri[1024]; + char *buf; + size_t size; + + *out_buf = NULL; + *out_size = 0; + + + ret = snprintf(uri, sizeof(uri) - 1, FLB_KUBE_API_CONFIGMAP_FMT, namespace, + configmap); + + if (ret < 0) { + return -1; + } + flb_plg_debug(ctx->ins, + "Send out request to API Server for configmap information"); + packed = get_meta_info_from_request(ctx, namespace, FLB_KUBE_CONFIGMAP, configmap, + &buf, &size, &root_type, uri, false); + + /* validate pack */ + if (packed == -1) { + return -1; + } + + *out_buf = buf; + *out_size = size; + + return 0; +} + /* Gather namespace metadata from API Server */ static int get_namespace_api_server_info(struct flb_kube *ctx, const char *namespace, char **out_buf, size_t *out_size) @@ -506,7 +548,7 @@ static int get_namespace_api_server_info(struct flb_kube *ctx, const char *names flb_plg_debug(ctx->ins, "Send out request to API Server for namespace information: %s", uri); // Namespace data is only available from kuberenetes api, not kubelet - packed = get_meta_info_from_request(ctx, namespace, "", + packed = get_meta_info_from_request(ctx, namespace, "","", &buf, &size, &root_type, uri, FLB_FALSE); } @@ -550,7 +592,7 @@ static int get_pod_api_server_info(struct flb_kube *ctx, } flb_plg_debug(ctx->ins, "Send out request to API Server for pods information"); - packed = get_meta_info_from_request(ctx, namespace, podname, + packed = get_meta_info_from_request(ctx, namespace, FLB_KUBE_POD, podname, &buf, &size, &root_type, uri, ctx->use_kubelet); } @@ -772,6 +814,154 @@ static void extract_container_hash(struct flb_kube_meta *meta, } } +static void cb_results_workload(const char *name, const char *value, + size_t vlen, void *data) +{ + if (name == NULL || value == NULL || vlen == 0 || data == NULL) { + return; + } + + struct flb_kube_meta *meta = data; + + if (meta->workload == NULL && strcmp(name, "deployment") == 0) { + meta->workload = flb_strndup(value, vlen); + meta->workload_len = vlen; + meta->fields++; + } +} + +/* + * Search workload based on the following priority + * where the top is highest priority. This is done + * to find the owner of the pod which helps with + * determining the upper-level management of the pod + * 1. Deployment name + * 2. StatefulSet name + * 3. DaemonSet name + * 4. Job name + * 5. CronJob name + * 6. Pod name + * 7. Container name + */ +static void search_workload(struct flb_kube_meta *meta, struct flb_kube *ctx, + msgpack_object map) +{ + int workload_found = FLB_FALSE; + int regex_found; + int ret; + struct flb_record_accessor *ra_name = NULL; + struct flb_record_accessor *ra_owner_refs = NULL; + struct flb_record_accessor *ra_kind = NULL; + struct flb_record_accessor *ra_owner_name = NULL; + struct flb_ra_value *name_val = NULL; + struct flb_ra_value *owner_refs_val = NULL; + struct flb_ra_value *kind_val = NULL; + struct flb_ra_value *owner_name_val = NULL; + struct flb_regex_search result; + + ra_name = flb_ra_create("$name", FLB_FALSE); + ra_owner_refs = flb_ra_create("$ownerReferences[0]", FLB_FALSE); + + if (!ra_name || !ra_owner_refs) { + goto cleanup; + } + + name_val = flb_ra_get_value_object(ra_name, map); + if (!name_val || name_val->type != FLB_RA_STRING || + name_val->o.via.str.size != meta->podname_len || + strncmp(name_val->o.via.str.ptr, meta->podname, meta->podname_len) != 0) { + goto cleanup; + } + + owner_refs_val = flb_ra_get_value_object(ra_owner_refs, map); + if (!owner_refs_val || owner_refs_val->o.type != MSGPACK_OBJECT_MAP) { + goto fallback_workload; + } + + ra_kind = flb_ra_create("$kind", FLB_FALSE); + ra_owner_name = flb_ra_create("$name", FLB_FALSE); + + if (!ra_kind || !ra_owner_name) { + goto cleanup; + } + + kind_val = flb_ra_get_value_object(ra_kind, owner_refs_val->o); + owner_name_val = flb_ra_get_value_object(ra_owner_name, owner_refs_val->o); + + if (kind_val && owner_name_val && + kind_val->type == FLB_RA_STRING && owner_name_val->type == FLB_RA_STRING) { + + if (kind_val->o.via.str.size == 10 && + strncmp(kind_val->o.via.str.ptr, "ReplicaSet", 10) == 0) { + regex_found = flb_regex_do(ctx->deploymentRegex, + owner_name_val->o.via.str.ptr, + owner_name_val->o.via.str.size, + &result); + if (regex_found > 0) { + ret = flb_regex_parse(ctx->deploymentRegex, &result, + cb_results_workload, meta); + if (ret == -1) { + goto cleanup; + } + } + else { + meta->workload = flb_strndup(owner_name_val->o.via.str.ptr, + owner_name_val->o.via.str.size); + meta->workload_len = owner_name_val->o.via.str.size; + meta->fields++; + } + } + else { + meta->workload = flb_strndup(owner_name_val->o.via.str.ptr, + owner_name_val->o.via.str.size); + meta->workload_len = owner_name_val->o.via.str.size; + meta->fields++; + } + workload_found = FLB_TRUE; + } + +fallback_workload: + if (!workload_found) { + if (meta->podname != NULL) { + meta->workload = flb_strndup(meta->podname, meta->podname_len); + meta->workload_len = meta->podname_len; + meta->fields++; + } + else if (meta->container_name != NULL) { + meta->workload = flb_strndup(meta->container_name, + meta->container_name_len); + meta->workload_len = meta->container_name_len; + meta->fields++; + } + } + +cleanup: + if (ra_name) { + flb_ra_destroy(ra_name); + } + if (ra_owner_refs) { + flb_ra_destroy(ra_owner_refs); + } + if (ra_kind) { + flb_ra_destroy(ra_kind); + } + if (ra_owner_name) { + flb_ra_destroy(ra_owner_name); + } + if (name_val) { + flb_ra_key_value_destroy(name_val); + } + if (owner_refs_val) { + flb_ra_key_value_destroy(owner_refs_val); + } + if (kind_val) { + flb_ra_key_value_destroy(kind_val); + } + if (owner_name_val) { + flb_ra_key_value_destroy(owner_name_val); + } +} + static int search_podname_and_namespace(struct flb_kube_meta *meta, struct flb_kube *ctx, msgpack_object map) @@ -1125,7 +1315,9 @@ static int merge_pod_meta(struct flb_kube_meta *meta, struct flb_kube *ctx, int have_owner_references = -1; int have_nodename = -1; int have_podip = -1; + int pod_service_found = -1; size_t off = 0; + size_t tmp_service_attr_size = 0; msgpack_sbuffer mp_sbuf; msgpack_packer mp_pck; @@ -1140,6 +1332,7 @@ static int merge_pod_meta(struct flb_kube_meta *meta, struct flb_kube *ctx, msgpack_object api_map; msgpack_object ann_map; struct flb_kube_props props = {0}; + struct service_attributes *tmp_service_attributes = {0}; /* * - reg_buf: is a msgpack Map containing meta captured using Regex @@ -1196,6 +1389,9 @@ static int merge_pod_meta(struct flb_kube_meta *meta, struct flb_kube *ctx, k = api_map.via.map.ptr[i].key; if (k.via.str.size == 8 && !strncmp(k.via.str.ptr, "metadata", 8)) { meta_val = api_map.via.map.ptr[i].val; + if (ctx->aws_use_pod_association) { + search_workload(meta, ctx, meta_val); + } if (meta_val.type == MSGPACK_OBJECT_MAP) { meta_found = FLB_TRUE; } @@ -1279,6 +1475,17 @@ static int merge_pod_meta(struct flb_kube_meta *meta, struct flb_kube *ctx, } } } + if (ctx->aws_use_pod_association) { + pod_service_found = flb_hash_table_get(ctx->aws_pod_service_hash_table, + meta->podname, meta->podname_len, + &tmp_service_attributes, &tmp_service_attr_size); + if (pod_service_found != -1 && tmp_service_attributes != NULL) { + map_size += tmp_service_attributes->fields; + } + if (ctx->platform) { + map_size++; + } + } /* Set map size: current + pod_id, labels and annotations */ map_size += meta->fields; @@ -1297,6 +1504,54 @@ static int merge_pod_meta(struct flb_kube_meta *meta, struct flb_kube *ctx, msgpack_pack_str(&mp_pck, meta->namespace_len); msgpack_pack_str_body(&mp_pck, meta->namespace, meta->namespace_len); } + if (ctx->aws_use_pod_association) { + if (pod_service_found != -1 && tmp_service_attributes != NULL) { + if (tmp_service_attributes->name[0] != '\0') { + msgpack_pack_str(&mp_pck, 23); + msgpack_pack_str_body(&mp_pck, "aws_entity_service_name", 23); + msgpack_pack_str(&mp_pck, tmp_service_attributes->name_len); + msgpack_pack_str_body(&mp_pck, + tmp_service_attributes->name, + tmp_service_attributes->name_len); + } + if (tmp_service_attributes->environment[0] != '\0') { + msgpack_pack_str(&mp_pck, 22); + msgpack_pack_str_body(&mp_pck, "aws_entity_environment", 22); + msgpack_pack_str(&mp_pck, tmp_service_attributes->environment_len); + msgpack_pack_str_body(&mp_pck, + tmp_service_attributes->environment, + tmp_service_attributes->environment_len); + } + if (tmp_service_attributes->name_source[0] != '\0') { + msgpack_pack_str(&mp_pck, 22); + msgpack_pack_str_body(&mp_pck, "aws_entity_name_source", 22); + msgpack_pack_str(&mp_pck, tmp_service_attributes->name_source_len); + msgpack_pack_str_body(&mp_pck, + tmp_service_attributes->name_source, + tmp_service_attributes->name_source_len); + } + } + + if (ctx->platform != NULL) { + int platform_len = strlen(ctx->platform); + msgpack_pack_str(&mp_pck, 19); + msgpack_pack_str_body(&mp_pck, "aws_entity_platform", 19); + msgpack_pack_str(&mp_pck, platform_len); + msgpack_pack_str_body(&mp_pck, ctx->platform, platform_len); + } + if (meta->cluster != NULL) { + msgpack_pack_str(&mp_pck, 18); + msgpack_pack_str_body(&mp_pck, "aws_entity_cluster", 18); + msgpack_pack_str(&mp_pck, meta->cluster_len); + msgpack_pack_str_body(&mp_pck, meta->cluster, meta->cluster_len); + } + if (meta->workload != NULL) { + msgpack_pack_str(&mp_pck, 19); + msgpack_pack_str_body(&mp_pck, "aws_entity_workload", 19); + msgpack_pack_str(&mp_pck, meta->workload_len); + msgpack_pack_str_body(&mp_pck, meta->workload, meta->workload_len); + } + } /* Append API Server content */ if (have_uid >= 0) { @@ -1561,8 +1816,11 @@ static inline int extract_pod_meta(struct flb_kube *ctx, struct flb_kube_meta *meta) { size_t off = 0; + size_t tmp_service_attr_size = 0; ssize_t n; int ret; + int pod_service_found; + struct service_attributes *tmp_service_attributes = {0}; /* Reset meta context */ memset(meta, '\0', sizeof(struct flb_kube_meta)); @@ -1582,6 +1840,23 @@ static inline int extract_pod_meta(struct flb_kube *ctx, if (ctx->cache_use_docker_id && meta->docker_id) { n += meta->docker_id_len + 1; } + + pod_service_found = flb_hash_table_get(ctx->aws_pod_service_hash_table, + meta->podname, meta->podname_len, + &tmp_service_attributes, &tmp_service_attr_size); + + if (pod_service_found != -1 && tmp_service_attributes != NULL) { + if (tmp_service_attributes->name[0] != '\0') { + n += tmp_service_attributes->name_len + 1; + } + if (tmp_service_attributes->environment[0] != '\0') { + n += tmp_service_attributes->environment_len + 1; + } + if (tmp_service_attributes->name_source[0] != '\0') { + n += tmp_service_attributes->name_source_len + 1; + } + } + meta->cache_key = flb_malloc(n); if (!meta->cache_key) { flb_errno(); @@ -1613,6 +1888,27 @@ static inline int extract_pod_meta(struct flb_kube *ctx, off += meta->docker_id_len; } + if (pod_service_found != -1 && tmp_service_attributes != NULL) { + if (tmp_service_attributes->name[0] != '\0') { + meta->cache_key[off++] = ':'; + memcpy(meta->cache_key + off, tmp_service_attributes->name, + tmp_service_attributes->name_len); + off += tmp_service_attributes->name_len; + } + if (tmp_service_attributes->environment[0] != '\0') { + meta->cache_key[off++] = ':'; + memcpy(meta->cache_key + off, tmp_service_attributes->environment, + tmp_service_attributes->environment_len); + off += tmp_service_attributes->environment_len; + } + if (tmp_service_attributes->name_source[0] != '\0') { + meta->cache_key[off++] = ':'; + memcpy(meta->cache_key + off, tmp_service_attributes->name_source, + tmp_service_attributes->name_source_len); + off += tmp_service_attributes->name_source_len; + } + } + meta->cache_key[off] = '\0'; meta->cache_key_len = off; } @@ -1661,7 +1957,9 @@ static int get_and_merge_pod_meta(struct flb_kube *ctx, struct flb_kube_meta *me int ret; char *api_buf; size_t api_size; - + if (ctx->aws_use_pod_association) { + get_cluster_from_environment(ctx, meta); + } if (ctx->use_tag_for_meta) { ret = merge_meta_from_tag(ctx, meta, out_buf, out_size); return ret; @@ -1715,6 +2013,34 @@ static int wait_for_dns(struct flb_kube *ctx) return -1; } +int flb_kube_pod_association_init(struct flb_kube *ctx, struct flb_config *config) +{ + ctx->aws_pod_association_tls = flb_tls_create(FLB_TLS_CLIENT_MODE, + ctx->aws_pod_association_host_tls_verify, + ctx->aws_pod_association_host_tls_debug, + NULL, NULL, + ctx->aws_pod_association_host_server_ca_file, + ctx->aws_pod_association_host_client_cert_file, + ctx->aws_pod_association_host_client_key_file, + NULL); + if (!ctx->aws_pod_association_tls) { + flb_plg_error(ctx->ins, "[kube_meta] could not create TLS config for pod association host"); + return -1; + } + ctx->aws_pod_association_upstream = flb_upstream_create(config, + ctx->aws_pod_association_host, + ctx->aws_pod_association_port, + FLB_IO_TLS, ctx->aws_pod_association_tls); + if (!ctx->aws_pod_association_upstream) { + flb_plg_error(ctx->ins, "kube network init create pod association upstream failed"); + flb_tls_destroy(ctx->aws_pod_association_tls); + ctx->aws_pod_association_tls = NULL; + return -1; + } + flb_upstream_thread_safe(ctx->aws_pod_association_upstream); + return 0; +} + static int flb_kubelet_network_init(struct flb_kube *ctx, struct flb_config *config) { int ret; @@ -1782,6 +2108,8 @@ static int flb_kube_network_init(struct flb_kube *ctx, struct flb_config *config int kubelet_network_init_ret = 0; ctx->kube_api_upstream = NULL; + ctx->aws_pod_association_upstream = NULL; + ctx->aws_pod_association_tls = NULL; /* Initialize Kube API Connection */ if (ctx->api_https == FLB_TRUE) { @@ -1825,6 +2153,11 @@ static int flb_kube_network_init(struct flb_kube *ctx, struct flb_config *config /* Remove async flag from upstream */ flb_stream_disable_async_mode(&ctx->kube_api_upstream->base); + /* Continue the filter kubernetes plugin functionality if the pod_association fails */ + if (ctx->aws_use_pod_association) { + flb_kube_pod_association_init(ctx, config); + } + kubelet_network_init_ret = flb_kubelet_network_init(ctx, config); return kubelet_network_init_ret; } @@ -1896,6 +2229,17 @@ int flb_kube_meta_init(struct flb_kube *ctx, struct flb_config *config) return -1; } + + ctx->platform = NULL; + if (ctx->aws_use_pod_association) { + ret = determine_platform(ctx); + if (ret == -1) { + ctx->platform = flb_strdup(NATIVE_KUBERNETES_PLATFORM); + } + else { + ctx->platform = flb_strdup(EKS_PLATFORM); + } + } flb_plg_info(ctx->ins, "connectivity OK"); flb_free(meta_buf); } @@ -2158,5 +2502,13 @@ int flb_kube_meta_release(struct flb_kube_meta *meta) flb_free(meta->cache_key); } + if (meta->workload) { + flb_free(meta->workload); + } + + if (meta->cluster) { + flb_free(meta->cluster); + } + return r; } diff --git a/plugins/filter_kubernetes/kube_meta.h b/plugins/filter_kubernetes/kube_meta.h index e4e3b06685a..50eb38a7c2b 100644 --- a/plugins/filter_kubernetes/kube_meta.h +++ b/plugins/filter_kubernetes/kube_meta.h @@ -27,6 +27,7 @@ struct flb_kube; struct flb_kube_meta { int fields; + int cluster_len; int namespace_len; int podname_len; int cache_key_len; @@ -34,12 +35,15 @@ struct flb_kube_meta { int docker_id_len; int container_hash_len; int container_image_len; + int workload_len; + char *cluster; char *namespace; char *podname; char *container_name; char *container_image; char *docker_id; + char *workload; char *container_hash; /* set only on Systemd mode */ @@ -54,8 +58,13 @@ struct flb_kube_meta { #define FLB_KUBE_API_PORT 443 #define FLB_KUBE_API_POD_FMT "/api/v1/namespaces/%s/pods/%s" #define FLB_KUBE_API_NAMESPACE_FMT "/api/v1/namespaces/%s" +#define FLB_KUBE_API_CONFIGMAP_FMT "/api/v1/namespaces/%s/configmaps/%s" #define FLB_KUBELET_PODS "/pods" +/* Constants for possible kubernetes resources */ +#define FLB_KUBE_POD "pod" +#define FLB_KUBE_CONFIGMAP "configmap" + int flb_kube_meta_init(struct flb_kube *ctx, struct flb_config *config); int flb_kube_meta_fetch(struct flb_kube *ctx); int flb_kube_dummy_meta_get(char **out_buf, size_t *out_size); @@ -63,11 +72,13 @@ int flb_kube_meta_get(struct flb_kube *ctx, const char *tag, int tag_len, const char *data, size_t data_size, const char **out_buf, size_t *out_size, - const char **namespace_out_buf, + const char **namespace_out_buf, size_t *namespace_out_size, struct flb_kube_meta *meta, struct flb_kube_props *props, struct flb_kube_meta *namespace_meta); int flb_kube_meta_release(struct flb_kube_meta *meta); +int flb_kube_pod_association_init(struct flb_kube *ctx, struct flb_config *config); +int get_api_server_configmap(struct flb_kube *ctx, const char *namespace, const char *configmap, char **out_buf, size_t *out_size); #endif diff --git a/plugins/filter_kubernetes/kube_regex.c b/plugins/filter_kubernetes/kube_regex.c index f9288d3afe1..6f62552e9c1 100644 --- a/plugins/filter_kubernetes/kube_regex.c +++ b/plugins/filter_kubernetes/kube_regex.c @@ -34,10 +34,14 @@ int flb_kube_regex_init(struct flb_kube *ctx) ctx->regex = flb_regex_create(KUBE_TAG_TO_REGEX); } } + ctx->deploymentRegex = flb_regex_create(DEPLOYMENT_REGEX); if (!ctx->regex) { return -1; } + if (!ctx->deploymentRegex) { + return -1; + } return 0; } diff --git a/plugins/filter_kubernetes/kube_regex.h b/plugins/filter_kubernetes/kube_regex.h index bd94d9d93cb..17d92e32830 100644 --- a/plugins/filter_kubernetes/kube_regex.h +++ b/plugins/filter_kubernetes/kube_regex.h @@ -26,6 +26,8 @@ #define KUBE_JOURNAL_TO_REGEX "^(?[^_]+)_(?[^\\._]+)(\\.(?[^_]+))?_(?[^_]+)_(?[^_]+)_[^_]+_[^_]+$" +#define DEPLOYMENT_REGEX "^(?.+)-(?[bcdfghjklmnpqrstvwxz2456789]{6,10})$" + int flb_kube_regex_init(struct flb_kube *ctx); #endif diff --git a/plugins/filter_kubernetes/kubernetes.c b/plugins/filter_kubernetes/kubernetes.c index c2e349b0aa6..0e2c80ffc18 100644 --- a/plugins/filter_kubernetes/kubernetes.c +++ b/plugins/filter_kubernetes/kubernetes.c @@ -31,15 +31,44 @@ #include "kube_meta.h" #include "kube_regex.h" #include "kube_property.h" +#include "kubernetes_aws.h" #include #include +#include /* Merge status used by merge_log_handler() */ #define MERGE_NONE 0 /* merge unescaped string in temporary buffer */ #define MERGE_PARSED 1 /* merge parsed string (log_buf) */ #define MERGE_MAP 2 /* merge direct binary object (v) */ +struct task_args { + struct flb_kube *ctx; + char *api_server_url; +}; + +pthread_mutex_t metadata_mutex; +pthread_t background_thread; +struct task_args *task_args = {0}; +struct mk_event_loop *evl; + +void *update_pod_service_map(void *arg) +{ + flb_engine_evl_init(); + evl = mk_event_loop_create(256); + if (evl == NULL) { + flb_plg_error(task_args->ctx->ins, + "Failed to create event loop for pod service map"); + return NULL; + } + flb_engine_evl_set(evl); + while (1) { + fetch_pod_service_map(task_args->ctx,task_args->api_server_url,&metadata_mutex); + flb_plg_debug(task_args->ctx->ins, "Updating pod to service map after %d seconds", task_args->ctx->aws_pod_service_map_refresh_interval); + sleep(task_args->ctx->aws_pod_service_map_refresh_interval); + } +} + static int get_stream(msgpack_object_map map) { int i; @@ -208,13 +237,34 @@ static int cb_kube_init(struct flb_filter_instance *f_ins, */ flb_kube_meta_init(ctx, config); +/* + * Init separate thread for calling pod to + * service map + */ + pthread_mutex_init(&metadata_mutex, NULL); + + if (ctx->aws_use_pod_association) { + task_args = flb_malloc(sizeof(struct task_args)); + if (!task_args) { + flb_errno(); + return -1; + } + task_args->ctx = ctx; + task_args->api_server_url = ctx->aws_pod_association_endpoint; + if (pthread_create(&background_thread, NULL, update_pod_service_map, NULL) != 0) { + flb_error("Failed to create background thread"); + background_thread = 0; + flb_free(task_args); + } + } + return 0; } static int pack_map_content(struct flb_log_event_encoder *log_encoder, msgpack_object source_map, const char *kube_buf, size_t kube_size, - const char *namespace_kube_buf, + const char *namespace_kube_buf, size_t namespace_kube_size, struct flb_time *time_lookup, struct flb_parser *parser, @@ -521,7 +571,7 @@ static int pack_map_content(struct flb_log_event_encoder *log_encoder, off = 0; msgpack_unpacked_init(&result); - msgpack_unpack_next(&result, namespace_kube_buf, + msgpack_unpack_next(&result, namespace_kube_buf, namespace_kube_size, &off); if (ret == FLB_EVENT_ENCODER_SUCCESS) { @@ -763,8 +813,20 @@ static int cb_kube_exit(void *data, struct flb_config *config) struct flb_kube *ctx; ctx = data; + flb_kube_conf_destroy(ctx); + if (background_thread) { + pthread_cancel(background_thread); + pthread_join(background_thread, NULL); + } + pthread_mutex_destroy(&metadata_mutex); + if (task_args) { + flb_free(task_args); + } + if (evl) { + mk_event_loop_destroy(evl); + } return 0; } @@ -1062,11 +1124,118 @@ static struct flb_config_map config_map[] = { { FLB_CONFIG_MAP_TIME, "kube_meta_namespace_cache_ttl", "15m", 0, FLB_TRUE, offsetof(struct flb_kube, kube_meta_namespace_cache_ttl), - "configurable TTL for K8s cached namespace metadata. " + "configurable TTL for K8s cached namespace metadata. " "By default, it is set to 15m and cached entries will be evicted after 15m." "Setting this to 0 will disable the cache TTL and " "will evict entries once the cache reaches capacity." }, + + /* + * Enable pod to service name association logics + * This can be configured with endpoint that returns a response with the corresponding + * podname in relation to the service name. For example, if there is a pod named "petclinic-12345" + * then in order to associate a service name to pod "petclinic-12345", the JSON response to the endpoint + * must follow the below patterns + * { + * "petclinic-12345": { + * "ServiceName":"petclinic", + * "Environment":"default" + * } + * } + */ + { + FLB_CONFIG_MAP_BOOL, "aws_use_pod_association", "false", + 0, FLB_TRUE, offsetof(struct flb_kube, aws_use_pod_association), + "use custom endpoint to get pod to service name mapping" + }, + { + FLB_CONFIG_MAP_BOOL, "use_pod_association", "false", + 0, FLB_TRUE, offsetof(struct flb_kube, aws_use_pod_association), + "use custom endpoint to get pod to service name mapping. " + "this config option is kept for backward compatibility for " + "AWS Observability users and will be deprecated in favor of " + "aws_use_pod_association." + }, + /* + * The host used for pod to service name association , default is 127.0.0.1 + * Will only check when "use_pod_association" config is set to true + */ + { + FLB_CONFIG_MAP_STR, "aws_pod_association_host", "cloudwatch-agent.amazon-cloudwatch", + 0, FLB_TRUE, offsetof(struct flb_kube, aws_pod_association_host), + "host to connect with when performing pod to service name association" + }, + /* + * The endpoint used for pod to service name association, default is /kubernetes/pod-to-service-env-map + * Will only check when "use_pod_association" config is set to true + */ + { + FLB_CONFIG_MAP_STR, "aws_pod_association_endpoint", "/kubernetes/pod-to-service-env-map", + 0, FLB_TRUE, offsetof(struct flb_kube, aws_pod_association_endpoint), + "endpoint to connect with when performing pod to service name association" + }, + /* + * The port for pod to service name association endpoint, default is 4311 + * Will only check when "use_pod_association" config is set to true + */ + { + FLB_CONFIG_MAP_INT, "aws_pod_association_port", "4311", + 0, FLB_TRUE, offsetof(struct flb_kube, aws_pod_association_port), + "port to connect with when performing pod to service name association" + }, + { + FLB_CONFIG_MAP_INT, "aws_pod_service_map_ttl", "0", + 0, FLB_TRUE, offsetof(struct flb_kube, aws_pod_service_map_ttl), + "configurable TTL for pod to service map storage. " + "By default, it is set to 0 which means TTL for cache entries is disabled and " + "cache entries are evicted at random when capacity is reached. " + "In order to enable this option, you should set the number to a time interval. " + "For example, set this value to 60 or 60s and cache entries " + "which have been created more than 60s will be evicted" + }, + { + FLB_CONFIG_MAP_INT, "aws_pod_service_map_refresh_interval", "60", + 0, FLB_TRUE, offsetof(struct flb_kube, aws_pod_service_map_refresh_interval), + "Refresh interval for the pod to service map storage." + "By default, it is set to refresh every 60 seconds" + }, + { + FLB_CONFIG_MAP_STR, "aws_pod_service_preload_cache_dir", NULL, + 0, FLB_TRUE, offsetof(struct flb_kube, aws_pod_service_preload_cache_path), + "set directory with pod to service map files" + }, + { + FLB_CONFIG_MAP_STR, "aws_pod_association_host_server_ca_file", "/etc/amazon-cloudwatch-observability-agent-server-cert/tls-ca.crt", + 0, FLB_TRUE, offsetof(struct flb_kube, aws_pod_association_host_server_ca_file), + "TLS CA certificate path for communication with agent server" + }, + { + FLB_CONFIG_MAP_STR, "aws_pod_association_host_client_cert_file", "/etc/amazon-cloudwatch-observability-agent-client-cert/client.crt", + 0, FLB_TRUE, offsetof(struct flb_kube, aws_pod_association_host_client_cert_file), + "Client Certificate path for enabling mTLS on calls to agent server" + }, + { + FLB_CONFIG_MAP_STR, "aws_pod_association_host_client_key_file", "/etc/amazon-cloudwatch-observability-agent-client-cert/client.key", + 0, FLB_TRUE, offsetof(struct flb_kube, aws_pod_association_host_client_key_file), + "Client Certificate Key path for enabling mTLS on calls to agent server" + }, + { + FLB_CONFIG_MAP_INT, "aws_pod_association_host_tls_debug", "0", + 0, FLB_TRUE, offsetof(struct flb_kube, aws_pod_association_host_tls_debug), + "set TLS debug level: 0 (no debug), 1 (error), " + "2 (state change), 3 (info) and 4 (verbose)" + }, + { + FLB_CONFIG_MAP_BOOL, "aws_pod_association_host_tls_verify", "true", + 0, FLB_TRUE, offsetof(struct flb_kube, aws_pod_association_host_tls_verify), + "enable or disable verification of TLS peer certificate" + }, + { + FLB_CONFIG_MAP_STR, "set_platform", NULL, + 0, FLB_TRUE, offsetof(struct flb_kube, set_platform), + "Set the platform that kubernetes is in. Possible values are k8s and eks" + "This should only be used for testing purpose" + }, /* EOF */ {0} }; diff --git a/plugins/filter_kubernetes/kubernetes_aws.c b/plugins/filter_kubernetes/kubernetes_aws.c new file mode 100644 index 00000000000..0b875553615 --- /dev/null +++ b/plugins/filter_kubernetes/kubernetes_aws.c @@ -0,0 +1,312 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2025 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#include +#include +#include +#include +#include + +#include "kube_conf.h" +#include "kube_meta.h" +#include "fluent-bit/flb_http_client.h" +#include "fluent-bit/flb_output_plugin.h" +#include "fluent-bit/flb_pack.h" +#include "fluent-bit/flb_upstream_conn.h" +/* + * If a file exists called service.map, load it and use it. + * If not, fall back to API. This is primarily for unit tests purposes, + */ +static int get_pod_service_file_info(struct flb_kube *ctx, char **buffer) +{ + int fd = -1; + char *payload = NULL; + size_t payload_size = 0; + struct stat sb; + int packed = -1; + int ret; + char uri[1024]; + + if (ctx->aws_pod_service_preload_cache_path) { + + ret = snprintf(uri, sizeof(uri) - 1, "%s.map", + ctx->aws_pod_service_preload_cache_path); + if (ret > 0) { + fd = open(uri, O_RDONLY, 0); + if (fd == -1) { + flb_errno(); + return -1; + } + if (fstat(fd, &sb) == 0) { + payload = flb_malloc(sb.st_size); + if (!payload) { + flb_errno(); + return -1; + } + else { + ret = read(fd, payload, sb.st_size); + if (ret == sb.st_size) { + payload_size = ret; + } + } + } + close(fd); + } + + if (payload_size) { + *buffer=payload; + packed = payload_size; + flb_plg_debug(ctx->ins, "pod to service map content is: %s", buffer); + } + } + + return packed; +} + +static void extract_service_attribute(msgpack_object *attr_map, const char *key_name, + char *dest, int max_len, int *dest_len, int *fields) +{ + struct flb_record_accessor *ra; + struct flb_ra_value *val; + const char *str_val; + size_t str_len; + + ra = flb_ra_create((char *)key_name, FLB_FALSE); + if (!ra) { + return; + } + + val = flb_ra_get_value_object(ra, *attr_map); + if (val && val->type == FLB_RA_STRING) { + str_val = flb_ra_value_buffer(val, &str_len); + if (str_val && str_len < max_len) { + memcpy(dest, str_val, str_len); + dest[str_len] = '\0'; + *dest_len = str_len; + (*fields)++; + } + flb_ra_key_value_destroy(val); + } + flb_ra_destroy(ra); +} + +static void parse_pod_service_map(struct flb_kube *ctx, char *api_buf, + size_t api_size, pthread_mutex_t *mutex) +{ + if (ctx->hash_table == NULL || ctx->aws_pod_service_hash_table == NULL) { + return; + } + flb_plg_debug(ctx->ins, "started parsing pod to service map"); + + size_t off = 0; + int ret; + msgpack_unpacked api_result; + msgpack_object api_map, k, v; + struct service_attributes *attrs; + char *buffer = NULL; + size_t size; + int root_type; + int i; + + if (api_buf == NULL) { + return; + } + + ret = flb_pack_json(api_buf, api_size, &buffer, &size, &root_type, NULL); + if (ret < 0) { + flb_plg_warn(ctx->ins, "Could not parse json response = %s", api_buf); + if (buffer) { + flb_free(buffer); + } + return; + } + + msgpack_unpacked_init(&api_result); + ret = msgpack_unpack_next(&api_result, buffer, size, &off); + if (ret != MSGPACK_UNPACK_SUCCESS) { + goto cleanup; + } + + api_map = api_result.data; + for (i = 0; i < api_map.via.map.size; i++) { + k = api_map.via.map.ptr[i].key; + v = api_map.via.map.ptr[i].val; + + if (k.type != MSGPACK_OBJECT_STR || v.type != MSGPACK_OBJECT_MAP) { + flb_plg_error(ctx->ins, "key and values are not string and map"); + continue; + } + char *pod_name = flb_strndup(k.via.str.ptr, k.via.str.size); + if (!pod_name) { + flb_free(pod_name); + continue; + } + attrs = flb_calloc(1, sizeof(struct service_attributes)); + if (!attrs) { + flb_errno(); + flb_free(pod_name); + continue; + } + extract_service_attribute(&v, "$ServiceName", attrs->name, + KEY_ATTRIBUTES_MAX_LEN, &attrs->name_len, &attrs->fields); + extract_service_attribute(&v, "$Environment", attrs->environment, + KEY_ATTRIBUTES_MAX_LEN, &attrs->environment_len, &attrs->fields); + extract_service_attribute(&v, "$ServiceNameSource", attrs->name_source, + SERVICE_NAME_SOURCE_MAX_LEN, &attrs->name_source_len, &attrs->fields); + if (attrs->name[0] != '\0' || attrs->environment[0] != '\0') { + pthread_mutex_lock(mutex); + flb_hash_table_add(ctx->aws_pod_service_hash_table, pod_name, k.via.str.size, + attrs, sizeof(struct service_attributes)); + pthread_mutex_unlock(mutex); + } + + flb_free(attrs); + flb_free(pod_name); + } + +cleanup: + flb_plg_debug(ctx->ins, "ended parsing pod to service map"); + msgpack_unpacked_destroy(&api_result); + if (buffer) { + flb_free(buffer); + } +} + +int fetch_pod_service_map(struct flb_kube *ctx, char *api_server_url, + pthread_mutex_t *mutex) +{ + if (!ctx->aws_use_pod_association) { + return -1; + } + int ret; + struct flb_http_client *c; + size_t b_sent; + struct flb_upstream_conn *u_conn; + char *buffer = {0}; + + flb_plg_debug(ctx->ins, "fetch pod to service map"); + + ret = get_pod_service_file_info(ctx, &buffer); + if (ret > 0 && buffer != NULL) { + parse_pod_service_map(ctx, buffer, ret, mutex); + flb_free(buffer); + } + else { + /* + * if block handles the TLS certificates update, as the Fluent-bit connection + * gets net timeout error, it destroys the upstream. On the next call to + * fetch_pod_service_map, it creates a new pod association upstream with + * latest TLS certs + */ + if (!ctx->aws_pod_association_upstream) { + flb_plg_debug(ctx->ins, "[kubernetes] upstream object for pod association" + " is NULL. Making a new one now"); + ret = flb_kube_pod_association_init(ctx,ctx->config); + if (ret == -1) { + return -1; + } + } + + u_conn = flb_upstream_conn_get(ctx->aws_pod_association_upstream); + if (!u_conn) { + flb_plg_error(ctx->ins, "[kubernetes] no upstream connections available to %s:%i", + ctx->aws_pod_association_upstream->tcp_host, + ctx->aws_pod_association_upstream->tcp_port); + flb_upstream_destroy(ctx->aws_pod_association_upstream); + flb_tls_destroy(ctx->aws_pod_association_tls); + ctx->aws_pod_association_upstream = NULL; + ctx->aws_pod_association_tls = NULL; + return -1; + } + + /* Create HTTP client */ + c = flb_http_client(u_conn, FLB_HTTP_GET, + api_server_url, + NULL, 0, ctx->aws_pod_association_host, + ctx->aws_pod_association_port, NULL, 0); + + if (!c) { + flb_error("[kubernetes] could not create HTTP client"); + flb_upstream_conn_release(u_conn); + flb_upstream_destroy(ctx->aws_pod_association_upstream); + flb_tls_destroy(ctx->aws_pod_association_tls); + ctx->aws_pod_association_upstream = NULL; + ctx->aws_pod_association_tls = NULL; + return -1; + } + + /* Perform HTTP request */ + ret = flb_http_do(c, &b_sent); + flb_plg_debug(ctx->ins, "Request (uri = %s) http_do=%i, " + "HTTP Status: %i", + api_server_url, ret, c->resp.status); + + if (ret != 0 || c->resp.status != 200) { + if (c->resp.payload_size > 0) { + flb_plg_debug(ctx->ins, "HTTP response : %s", + c->resp.payload); + } + flb_http_client_destroy(c); + flb_upstream_conn_release(u_conn); + return -1; + } + + /* Parse response data */ + if (c->resp.payload != NULL) { + flb_plg_debug(ctx->ins, "HTTP response payload : %s", + c->resp.payload); + parse_pod_service_map(ctx, c->resp.payload, c->resp.payload_size, mutex); + } + + /* Cleanup */ + flb_http_client_destroy(c); + flb_upstream_conn_release(u_conn); + } + return 0; +} + +/* Determine platform by checking aws-auth configmap */ +int determine_platform(struct flb_kube *ctx) +{ + int ret; + char *config_buf; + size_t config_size; + + ret = get_api_server_configmap(ctx, KUBE_SYSTEM_NAMESPACE, AWS_AUTH_CONFIG_MAP, &config_buf, &config_size); + if (ret != -1) { + flb_free(config_buf); + return 1; + } + return -1; +} + +/* Gather pods list information from Kubelet */ +void get_cluster_from_environment(struct flb_kube *ctx, struct flb_kube_meta *meta) +{ + if (meta->cluster == NULL) { + char* cluster_name = getenv("CLUSTER_NAME"); + if (cluster_name) { + meta->cluster = strdup(cluster_name); + meta->cluster_len = strlen(cluster_name); + meta->fields++; + flb_plg_debug(ctx->ins, "Cluster name is %s.", meta->cluster); + } + } +} diff --git a/plugins/filter_kubernetes/kubernetes_aws.h b/plugins/filter_kubernetes/kubernetes_aws.h new file mode 100644 index 00000000000..f5d3c430798 --- /dev/null +++ b/plugins/filter_kubernetes/kubernetes_aws.h @@ -0,0 +1,10 @@ +#ifndef KUBERNETES_AWS_H +#define KUBERNETES_AWS_H + +#include "kube_conf.h" + +int fetch_pod_service_map(struct flb_kube *ctx, char *api_server_url, pthread_mutex_t *mutex); +int determine_platform(struct flb_kube *ctx); +void get_cluster_from_environment(struct flb_kube *ctx, struct flb_kube_meta *meta); + +#endif