Skip to content

Commit

Permalink
kubernetes url to API is now part of config. Added documentation abou…
Browse files Browse the repository at this point in the history
…t it
  • Loading branch information
AngeCyp committed Jul 3, 2024
1 parent 6c3b964 commit 02de29c
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 16 deletions.
24 changes: 23 additions & 1 deletion plugin-k8s/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,31 @@ If it doesn't detect the use of cgroupv2, the plugin will not start.
In this version of the plugin, to gather some data about pods and nodes, the plugin use kubectl. So make sure that kubectl is installed and usable.
[How to install kubectl](https://kubernetes.io/docs/tasks/tools/install-kubectl-linux/)

Thanks to kubectl, you can get the kubernetes API URL.
Use:

```bash
kubectl config view -o jsonpath='{"Cluster name\tServer\n"}{range .clusters[*]}{.name}{"\t"}{.cluster.server}{"\n"}{end}'
```

And note the URL corresponding to the kubernetes you want interact with.

Example:

```bash
Cluster name Server
kubernetes https://1.2.3.4:6443
```

I get the kubernetes part and I wrote the result in the **alumet-config.toml** file.
Under the section: **[plugins.k8s]** I add the following:
> **kubernetes_api_url = "https://1.2.3.4:6443"**
By default the value **kubernetes_api_url** will be set at: **https://127.0.0.1:8080**

## alumet-reader

To use the Kubernetes API an user is needed. It's the alumet-reader user. Make sur the user exist and have the good rights.
To use the Kubernetes API an user is needed. It's the **alumet-reader** user. Make sur the user exist and have the good rights.
You can use the yaml file: [alumet-user.yaml](./alumet-user.yaml) to create this user.
Run:

Expand Down
50 changes: 39 additions & 11 deletions plugin-k8s/src/cgroup_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,17 @@ pub fn is_accessible_dir(path: &Path) -> bool {
}

/// Returns a Vector of CgroupV2MetricFile associated to pods availables under a given directory.
fn list_metric_file_in_dir(root_directory_path: &Path, hostname: String) -> anyhow::Result<Vec<CgroupV2MetricFile>> {
fn list_metric_file_in_dir(
root_directory_path: &Path,
hostname: String,
kubernetes_api_url: String,
) -> anyhow::Result<Vec<CgroupV2MetricFile>> {
let mut vec_file_metric: Vec<CgroupV2MetricFile> = Vec::new();
let entries = fs::read_dir(root_directory_path)?;
// Let's create a runtime to await async function and fullfill hashmap
let rt = tokio::runtime::Builder::new_current_thread().enable_all().build()?;
let main_hash_map: HashMap<String, (String, String, String)> =
rt.block_on(async { get_existing_pods(hostname).await })?;
rt.block_on(async { get_existing_pods(hostname, kubernetes_api_url).await })?;

// For each File in the root path
for entry in entries {
Expand Down Expand Up @@ -126,7 +130,11 @@ fn list_metric_file_in_dir(root_directory_path: &Path, hostname: String) -> anyh
/// This function list all k8s pods availables, using sub-directories to look in:
/// For each subdirectory, we look in if there is a directory/ies about pods and we add it
/// to a vector. All subdirectory are visited with the help of <list_metric_file_in_dir> function.
pub fn list_all_k8s_pods_file(root_directory_path: &Path, hostname: String) -> anyhow::Result<Vec<CgroupV2MetricFile>> {
pub fn list_all_k8s_pods_file(
root_directory_path: &Path,
hostname: String,
kubernetes_api_url: String,
) -> anyhow::Result<Vec<CgroupV2MetricFile>> {
let mut final_li_metric_file: Vec<CgroupV2MetricFile> = Vec::new();
if !root_directory_path.exists() {
return Ok(final_li_metric_file);
Expand All @@ -148,7 +156,7 @@ pub fn list_all_k8s_pods_file(root_directory_path: &Path, hostname: String) -> a
}

for prefix in all_sub_dir {
let mut result_vec = list_metric_file_in_dir(&prefix.to_owned(), hostname.clone())?;
let mut result_vec = list_metric_file_in_dir(&prefix.to_owned(), hostname.clone(), kubernetes_api_url.clone())?;
final_li_metric_file.append(&mut result_vec);
}
return Ok(final_li_metric_file);
Expand All @@ -171,7 +179,10 @@ pub fn gather_value(file: &mut CgroupV2MetricFile, content_buffer: &mut String)
}

/// Returns a HashMap where the key is the uid used and the value is a tuple containing it's name, namespace and node
pub async fn get_existing_pods(node: String) -> anyhow::Result<HashMap<String, (String, String, String)>> {
pub async fn get_existing_pods(
node: String,
kubernetes_api_url: String,
) -> anyhow::Result<HashMap<String, (String, String, String)>> {
let Ok(output) = Command::new("kubectl")
.args(&["create", "token", "alumet-reader"])
.output()
Expand All @@ -181,11 +192,18 @@ pub async fn get_existing_pods(node: String) -> anyhow::Result<HashMap<String, (

let token = String::from_utf8_lossy(&output.stdout);
let token = token.trim();
let api_url_root = "https://10.22.80.14:6443/api/v1/pods/";
if kubernetes_api_url == "" {
return Ok(HashMap::new());
}
let mut api_url_root = kubernetes_api_url.clone();
api_url_root.push_str("/api/v1/pods/");
// let api_url_root = "https://10.22.80.14:6443/api/v1/pods/";
let mut selector = false;
let api_url = if node == "" {
api_url_root.to_owned()
} else {
let tmp = format!("{}?fieldSelector=spec.nodeName={}", api_url_root, node);
selector = true;
tmp
};
let mut headers = header::HeaderMap::new();
Expand All @@ -210,7 +228,7 @@ pub async fn get_existing_pods(node: String) -> anyhow::Result<HashMap<String, (
// let's check if the items' part contain pods to look at
if let Some(items) = data.get("items") {
let size = items.as_array().unwrap_or(&vec![]).len(); // If the node was not found i.e. no item in the response, we call the API again with all nodes
if size == 0 {
if size == 0 && selector {
// Ask again the api, with all nodes
let Ok(response) = client.get(api_url_root).send().await else {
return Ok(HashMap::new());
Expand Down Expand Up @@ -265,7 +283,11 @@ pub async fn get_existing_pods(node: String) -> anyhow::Result<HashMap<String, (
}

/// Reads files in a filesystem to associate a cgroup of a poduid to a kubernetes pod name
pub async fn get_pod_name(uid: String, node: String) -> anyhow::Result<(String, String, String)> {
pub async fn get_pod_name(
uid: String,
node: String,
kubernetes_api_url: String,
) -> anyhow::Result<(String, String, String)> {
let new_uid = uid.replace("_", "-");
let Ok(output) = Command::new("kubectl")
.args(&["create", "token", "alumet-reader"])
Expand All @@ -276,11 +298,17 @@ pub async fn get_pod_name(uid: String, node: String) -> anyhow::Result<(String,

let token = String::from_utf8_lossy(&output.stdout);
let token = token.trim();
let api_url_root = "https://10.22.80.14:6443/api/v1/pods/";
if kubernetes_api_url == "" {
return Ok(("".to_string(), "".to_string(), "".to_string()));
}
let mut api_url_root = kubernetes_api_url.clone();
api_url_root.push_str("/api/v1/pods/");
let mut selector = false;
let api_url = if node == "" {
api_url_root.to_owned()
} else {
let tmp = format!("{}?fieldSelector=spec.nodeName={}", api_url_root, node);
selector = true;
tmp
};
let mut headers = header::HeaderMap::new();
Expand All @@ -305,7 +333,7 @@ pub async fn get_pod_name(uid: String, node: String) -> anyhow::Result<(String,
// let's check if the items' part contain pods to look at
if let Some(items) = data.get("items") {
let size = items.as_array().unwrap_or(&vec![]).len(); // If the node was not found i.e. no item in the response, we call the API again with all nodes
if size == 0 {
if size == 0 && selector {
// Ask again the api, with all nodes
let Ok(response) = client.get(api_url_root).send().await else {
return Ok(("".to_string(), "".to_string(), "".to_string()));
Expand Down Expand Up @@ -398,7 +426,7 @@ mod tests {
std::fs::write(c.join("cpu.stat"), "sv").unwrap();
std::fs::write(d.join("cpu.stat"), "ne").unwrap();
let li_met_file: anyhow::Result<Vec<CgroupV2MetricFile>> =
list_metric_file_in_dir(&burstable_dir, "".to_string());
list_metric_file_in_dir(&burstable_dir, "".to_string(), "".to_string());
let list_pod_name = [
"pod32a1942cb9a81912549c152a49b5f9b1",
"podd9209de2b4b526361248c9dcf3e702c0",
Expand Down
18 changes: 14 additions & 4 deletions plugin-k8s/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ struct Config {
/// Initial interval between two cgroup measurements.
#[serde(with = "humantime_serde")]
poll_interval: Duration,
kubernetes_api_url: String,
}

impl AlumetPlugin for K8sPlugin {
Expand Down Expand Up @@ -74,7 +75,7 @@ impl AlumetPlugin for K8sPlugin {
.to_string();

let final_list_metric_file: Vec<CgroupV2MetricFile> =
cgroup_v2::list_all_k8s_pods_file(&self.config.path, hostname)?;
cgroup_v2::list_all_k8s_pods_file(&self.config.path, hostname, self.config.kubernetes_api_url.clone())?;

//Add as a source each pod already present
for metric_file in final_list_metric_file {
Expand Down Expand Up @@ -105,11 +106,13 @@ impl AlumetPlugin for K8sPlugin {
// let metrics = self.metrics.clone().unwrap();
let metrics = self.metrics.clone().with_context(|| "Metrics is not available")?;
let poll_interval = self.config.poll_interval;
let kubernetes_api_url = self.config.kubernetes_api_url.clone();
struct PodDetector {
plugin_name: String,
metrics: Metrics,
control_handle: ControlHandle,
poll_interval: Duration,
kubernetes_api_url: String,
}

impl EventHandler for PodDetector {
Expand Down Expand Up @@ -170,9 +173,14 @@ impl AlumetPlugin for K8sPlugin {
return;
}
};
let (name, namespace, node) = match rt
.block_on(async { cgroup_v2::get_pod_name(name_to_seek.to_owned(), hostname).await })
{
let (name, namespace, node) = match rt.block_on(async {
cgroup_v2::get_pod_name(
name_to_seek.to_owned(),
hostname,
self.kubernetes_api_url.clone(),
)
.await
}) {
Ok(tuple_found) => tuple_found,
Err(err) => {
log::error!("Block on failed returned an error: {}", err);
Expand Down Expand Up @@ -233,6 +241,7 @@ impl AlumetPlugin for K8sPlugin {
metrics: metrics,
control_handle: control_handle,
poll_interval: poll_interval,
kubernetes_api_url: kubernetes_api_url,
};

let mut watcher = notify::recommended_watcher(handler)?;
Expand All @@ -250,6 +259,7 @@ impl Default for Config {
Self {
path: root_path,
poll_interval: Duration::from_secs(1), // 1Hz
kubernetes_api_url: String::from("https://127.0.0.1:8080"),
}
}
}

0 comments on commit 02de29c

Please sign in to comment.