Skip to content

Commit

Permalink
fix(agent): correct agent's behavior on Instance deletion
Browse files Browse the repository at this point in the history
Make the agent remove itself from the instance nodes list instead of
deleting the instance if it's not the last node with access to the
device.

Refactored the way we get the AGENT_NODE_NAME to only fetch it once at
startup.

This fixes #650.

Signed-off-by: Nicolas Belouin <[email protected]>
  • Loading branch information
diconico07 committed Sep 7, 2023
1 parent 209d7df commit 9216d66
Show file tree
Hide file tree
Showing 4 changed files with 290 additions and 67 deletions.
12 changes: 9 additions & 3 deletions agent/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use log::{info, trace};
use prometheus::{HistogramVec, IntGaugeVec};
use std::{
collections::HashMap,
env,
sync::{Arc, Mutex},
time::Duration,
};
Expand Down Expand Up @@ -52,6 +53,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>
);

let mut tasks = Vec::new();
let node_name = env::var("AGENT_NODE_NAME")?;

// Start server for Prometheus metrics
tasks.push(tokio::spawn(async move {
Expand Down Expand Up @@ -83,9 +85,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>
}));

tasks.push(tokio::spawn(async move {
config_action::do_config_watch(discovery_handler_map, new_discovery_handler_sender_clone)
.await
.unwrap()
config_action::do_config_watch(
discovery_handler_map,
new_discovery_handler_sender_clone,
node_name,
)
.await
.unwrap()
}));

futures::future::try_join_all(tasks).await?;
Expand Down
15 changes: 15 additions & 0 deletions agent/src/util/config_action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ pub struct ConfigInfo {
pub async fn do_config_watch(
discovery_handler_map: RegisteredDiscoveryHandlerMap,
new_discovery_handler_sender: broadcast::Sender<String>,
node_name: String,
) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
info!("do_config_watch - enter");
let config_map: ConfigMap = Arc::new(RwLock::new(HashMap::new()));
Expand All @@ -63,13 +64,15 @@ pub async fn do_config_watch(
let discovery_handler_map = discovery_handler_map.clone();
let new_discovery_handler_sender = new_discovery_handler_sender.clone();
let new_kube_interface = kube_interface.clone();
let new_node_name = node_name.clone();
tasks.push(tokio::spawn(async move {
handle_config_add(
new_kube_interface,
&config,
config_map,
discovery_handler_map,
new_discovery_handler_sender,
new_node_name,
)
.await
.unwrap();
Expand All @@ -83,6 +86,7 @@ pub async fn do_config_watch(
config_map,
discovery_handler_map,
new_discovery_handler_sender,
node_name,
)
.await
.unwrap();
Expand All @@ -99,6 +103,7 @@ async fn watch_for_config_changes(
config_map: ConfigMap,
discovery_handler_map: RegisteredDiscoveryHandlerMap,
new_discovery_handler_sender: broadcast::Sender<String>,
node_name: String,
) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
trace!("watch_for_config_changes - start");
let resource = Api::<Configuration>::all(kube_interface.get_kube_client());
Expand All @@ -121,6 +126,7 @@ async fn watch_for_config_changes(
config_map.clone(),
discovery_handler_map.clone(),
new_discovery_handler_sender,
node_name.clone(),
)
.await?
}
Expand All @@ -135,6 +141,7 @@ async fn handle_config(
config_map: ConfigMap,
discovery_handler_map: RegisteredDiscoveryHandlerMap,
new_discovery_handler_sender: broadcast::Sender<String>,
node_name: String,
) -> anyhow::Result<()> {
trace!("handle_config - something happened to a configuration");
match event {
Expand All @@ -149,6 +156,7 @@ async fn handle_config(
config_map,
discovery_handler_map,
new_discovery_handler_sender,
node_name,
)
.await?;
}
Expand Down Expand Up @@ -186,6 +194,7 @@ async fn handle_config(
config_map.clone(),
discovery_handler_map.clone(),
new_discovery_handler_sender.clone(),
node_name.clone(),
)
.await?;
}
Expand All @@ -200,6 +209,7 @@ async fn handle_config_apply(
config_map: ConfigMap,
discovery_handler_map: RegisteredDiscoveryHandlerMap,
new_discovery_handler_sender: broadcast::Sender<String>,
node_name: String,
) -> anyhow::Result<()> {
// Applied events can either be newly added Configurations or modified Configurations.
// If modified delete all associated instances and device plugins and then recreate them to reflect updated config
Expand Down Expand Up @@ -231,6 +241,7 @@ async fn handle_config_apply(
config_map,
discovery_handler_map,
new_discovery_handler_sender,
node_name,
)
.await
.unwrap();
Expand All @@ -246,6 +257,7 @@ async fn handle_config_add(
config_map: ConfigMap,
discovery_handler_map: RegisteredDiscoveryHandlerMap,
new_discovery_handler_sender: broadcast::Sender<String>,
node_name: String,
) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
let config_id: ConfigId = (
config.metadata.namespace.clone().unwrap(),
Expand Down Expand Up @@ -276,6 +288,7 @@ async fn handle_config_add(
stop_discovery_sender,
&mut finished_discovery_sender,
kube_interface,
node_name,
)
.await
.unwrap();
Expand Down Expand Up @@ -446,6 +459,7 @@ mod config_action_tests {
config_map.clone(),
dh_map.clone(),
tx.clone(),
"node-a".to_string(),
)
.await
.is_ok());
Expand All @@ -461,6 +475,7 @@ mod config_action_tests {
config_map.clone(),
dh_map,
tx,
"node-a".to_string(),
)
.await
.is_ok());
Expand Down
19 changes: 13 additions & 6 deletions agent/src/util/device_plugin_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use super::{
KUBELET_SOCKET, LIST_AND_WATCH_MESSAGE_CHANNEL_CAPACITY,
},
device_plugin_service::{
ConfigurationDevicePlugin, DevicePluginBehavior, DevicePluginContext, DevicePluginService,
InstanceDevicePlugin, ListAndWatchMessageKind,
get_device_instance_name, ConfigurationDevicePlugin, DevicePluginBehavior,
DevicePluginContext, DevicePluginService, InstanceDevicePlugin, ListAndWatchMessageKind,
},
v1beta1,
v1beta1::{
Expand All @@ -24,7 +24,7 @@ use log::{info, trace};
#[cfg(test)]
use mockall::{automock, predicate::*};
use std::sync::Arc;
use std::{convert::TryFrom, env, path::Path, time::SystemTime};
use std::{convert::TryFrom, path::Path, time::SystemTime};
use tokio::{
net::UnixListener,
net::UnixStream,
Expand All @@ -39,19 +39,20 @@ use tower::service_fn;
pub trait DevicePluginBuilderInterface: Send + Sync {
async fn build_device_plugin(
&self,
instance_name: String,
instance_id: String,
config: &Configuration,
shared: bool,
device_plugin_context: Arc<RwLock<DevicePluginContext>>,
device: Device,
node_name: String,
) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>>;

async fn build_configuration_device_plugin(
&self,
device_plugin_name: String,
config: &Configuration,
device_plugin_context: Arc<RwLock<DevicePluginContext>>,
node_name: String,
) -> Result<
broadcast::Sender<ListAndWatchMessageKind>,
Box<dyn std::error::Error + Send + Sync + 'static>,
Expand All @@ -66,13 +67,15 @@ impl DevicePluginBuilderInterface for DevicePluginBuilder {
/// This creates a new DevicePluginService for an instance and registers it with the kubelet
async fn build_device_plugin(
&self,
instance_name: String,
instance_id: String,
config: &Configuration,
shared: bool,
device_plugin_context: Arc<RwLock<DevicePluginContext>>,
device: Device,
node_name: String,
) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
let instance_name =
get_device_instance_name(&instance_id, config.metadata.name.as_ref().unwrap());
info!("build_device_plugin - entered for device {}", instance_name);
let device_plugin_behavior = DevicePluginBehavior::Instance(InstanceDevicePlugin {
instance_id: instance_id.clone(),
Expand All @@ -87,6 +90,7 @@ impl DevicePluginBuilderInterface for DevicePluginBuilder {
device_plugin_context,
device_plugin_behavior,
list_and_watch_message_sender,
node_name,
)
.await
}
Expand All @@ -97,6 +101,7 @@ impl DevicePluginBuilderInterface for DevicePluginBuilder {
device_plugin_name: String,
config: &Configuration,
device_plugin_context: Arc<RwLock<DevicePluginContext>>,
node_name: String,
) -> Result<
broadcast::Sender<ListAndWatchMessageKind>,
Box<dyn std::error::Error + Send + Sync + 'static>,
Expand All @@ -115,6 +120,7 @@ impl DevicePluginBuilderInterface for DevicePluginBuilder {
device_plugin_context,
device_plugin_behavior,
list_and_watch_message_sender.clone(),
node_name,
)
.await?;
Ok(list_and_watch_message_sender)
Expand All @@ -129,6 +135,7 @@ impl DevicePluginBuilder {
device_plugin_context: Arc<RwLock<DevicePluginContext>>,
device_plugin_behavior: DevicePluginBehavior,
list_and_watch_message_sender: broadcast::Sender<ListAndWatchMessageKind>,
node_name: String,
) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
let capability_id: String = format!("{}/{}", AKRI_PREFIX, device_plugin_name);
let unique_time = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH)?;
Expand All @@ -147,7 +154,7 @@ impl DevicePluginBuilder {
config_name: config.metadata.name.clone().unwrap(),
config_uid: config.metadata.uid.as_ref().unwrap().clone(),
config_namespace: config.metadata.namespace.as_ref().unwrap().clone(),
node_name: env::var("AGENT_NODE_NAME")?,
node_name,
device_plugin_context,
list_and_watch_message_sender,
server_ender_sender: server_ender_sender.clone(),
Expand Down
Loading

0 comments on commit 9216d66

Please sign in to comment.