Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modify Agent to reduce frequency of Pods getting UnexpectedAdmissionError #556

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 14 additions & 14 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion agent/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "agent"
version = "0.9.3"
version = "0.9.4"
authors = ["Kate Goldenring <[email protected]>", "<[email protected]>"]
edition = "2018"
rust-version = "1.63.0"
Expand Down
112 changes: 48 additions & 64 deletions agent/src/util/device_plugin_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,27 +342,28 @@ impl DevicePluginService {
}
}

/// This returns the value that should be inserted at `device_usage_id` slot for an instance else an error.
/// This returns true if this node can reserve a `device_usage_id` slot for an instance
/// and false if it is already reserved.
/// # More details
/// Cases based on the usage slot (`device_usage_id`) value
/// 1. device_usage\[id\] == "" ... this means that the device is available for use
/// * <ACTION> return this node name
/// * <ACTION> return true
/// 2. device_usage\[id\] == self.nodeName ... this means THIS node previously used id, but the DevicePluginManager knows that this is no longer true
/// * <ACTION> return ""
/// * <ACTION> return false
/// 3. device_usage\[id\] == <some other node> ... this means that we believe this device is in use by another node and should be marked unhealthy
/// * <ACTION> return error
/// 4. No corresponding id found ... this is an unknown error condition (BAD)
/// * <ACTION> return error
fn get_slot_value(
fn slot_available_to_reserve(
device_usage_id: &str,
node_name: &str,
instance: &InstanceSpec,
) -> Result<String, Status> {
) -> Result<bool, Status> {
if let Some(allocated_node) = instance.device_usage.get(device_usage_id) {
if allocated_node.is_empty() {
Ok(node_name.to_string())
Ok(true)
} else if allocated_node == node_name {
Ok("".to_string())
Ok(false)
} else {
trace!("internal_allocate - request for device slot {} previously claimed by a diff node {} than this one {} ... indicates the device on THIS node must be marked unhealthy, invoking ListAndWatch ... returning failure, next scheduling should succeed!", device_usage_id, allocated_node, node_name);
Err(Status::new(
Expand All @@ -383,7 +384,7 @@ fn get_slot_value(
}
}

/// This tries up to `MAX_INSTANCE_UPDATE_TRIES` to update the requested slot of the Instance with the appropriate value (either "" to clear slot or node_name).
/// This tries up to `MAX_INSTANCE_UPDATE_TRIES` to update the requested slot of the Instance with the this node's name.
/// It cannot be assumed that this will successfully update Instance on first try since Device Plugins on other nodes may be simultaneously trying to update the Instance.
/// This returns an error if slot does not need to be updated or `MAX_INSTANCE_UPDATE_TRIES` attempted.
async fn try_update_instance_device_usage(
Expand Down Expand Up @@ -413,36 +414,28 @@ async fn try_update_instance_device_usage(
}
}

// at this point, `value` should either be:
// * `node_name`: meaning that this node is claiming this slot
// * "": meaning this node previously claimed this slot, but kubelet
// knows that claim is no longer valid. In this case, reset the
// slot (which triggers each node to set the slot as Healthy) to
// allow a fair rescheduling of the workload
let value = get_slot_value(device_usage_id, node_name, &instance)?;
instance
.device_usage
.insert(device_usage_id.to_string(), value.clone());
// Update the instance to reserve this slot for this node iff it is available and not already reserved for this node.
if slot_available_to_reserve(device_usage_id, node_name, &instance)? {
instance
.device_usage
.insert(device_usage_id.to_string(), node_name.to_string());

match kube_interface
.update_instance(&instance, instance_name, instance_namespace)
.await
{
Ok(()) => {
if value == node_name {
return Ok(());
} else {
return Err(Status::new(Code::Unknown, "Devices are in inconsistent state, updated device usage, please retry scheduling"));
}
}
Err(e) => {
if let Err(e) = kube_interface
.update_instance(&instance, instance_name, instance_namespace)
.await
{
if x == (MAX_INSTANCE_UPDATE_TRIES - 1) {
trace!("internal_allocate - update_instance returned error [{}] after max tries ... returning error", e);
return Err(Status::new(Code::Unknown, "Could not update Instance"));
}
random_delay().await;
} else {
return Ok(());
}
} else {
// Instance slot already reserved for this node
return Ok(());
}
random_delay().await;
}
Ok(())
}
Expand Down Expand Up @@ -1331,7 +1324,7 @@ mod device_plugin_service_tests {
mock: &mut MockKubeInterface,
device_plugin_service: &DevicePluginService,
formerly_allocated_node: String,
newly_allocated_node: String,
newly_allocated_node: Option<String>,
) -> Request<AllocateRequest> {
let device_usage_id_slot = format!("{}-0", device_plugin_service.instance_name);
let device_usage_id_slot_2 = device_usage_id_slot.clone();
Expand All @@ -1343,16 +1336,18 @@ mod device_plugin_service_tests {
formerly_allocated_node,
NodeName::ThisNode,
);
mock.expect_update_instance()
.times(1)
.withf(move |instance_to_update: &InstanceSpec, _, _| {
instance_to_update
.device_usage
.get(&device_usage_id_slot)
.unwrap()
== &newly_allocated_node
})
.returning(move |_, _, _| Ok(()));
if let Some(new_node) = newly_allocated_node {
mock.expect_update_instance()
.times(1)
.withf(move |instance_to_update: &InstanceSpec, _, _| {
instance_to_update
.device_usage
.get(&device_usage_id_slot)
.unwrap()
== &new_node
})
.returning(move |_, _, _| Ok(()));
}
let devices_i_ds = vec![device_usage_id_slot_2];
let container_requests = vec![v1beta1::ContainerAllocateRequest { devices_i_ds }];
Request::new(AllocateRequest { container_requests })
Expand All @@ -1370,7 +1365,7 @@ mod device_plugin_service_tests {
&mut mock,
&device_plugin_service,
String::new(),
node_name,
Some(node_name),
);
let broker_envs = device_plugin_service
.internal_allocate(request, Arc::new(mock))
Expand Down Expand Up @@ -1404,7 +1399,7 @@ mod device_plugin_service_tests {
&mut mock,
&device_plugin_service,
String::new(),
node_name,
Some(node_name),
);
assert!(device_plugin_service
.internal_allocate(request, Arc::new(mock),)
Expand All @@ -1417,7 +1412,8 @@ mod device_plugin_service_tests {
}

// Test when device_usage[id] == self.nodeName
// Expected behavior: internal_allocate should set device_usage[id] == "", invoke list_and_watch, and return error
// Expected behavior: internal_allocate should keep device_usage[id] == self.nodeName and
// instance should not be updated
#[tokio::test]
async fn test_internal_allocate_deallocate() {
let _ = env_logger::builder().is_test(true).try_init();
Expand All @@ -1428,28 +1424,16 @@ mod device_plugin_service_tests {
&mut mock,
&device_plugin_service,
"node-a".to_string(),
String::new(),
None,
);
match device_plugin_service
assert!(device_plugin_service
.internal_allocate(request, Arc::new(mock))
.await
{
Ok(_) => {
panic!("internal allocate is expected to fail due to devices being in bad state")
}
Err(e) => assert_eq!(
e.message(),
"Devices are in inconsistent state, updated device usage, please retry scheduling"
),
}
assert_eq!(
device_plugin_service_receivers
.list_and_watch_message_receiver
.recv()
.await
.unwrap(),
ListAndWatchMessageKind::Continue
);
.is_ok());
assert!(device_plugin_service_receivers
.list_and_watch_message_receiver
.try_recv()
.is_err());
}

// Tests when device_usage[id] == <another node>
Expand Down
2 changes: 1 addition & 1 deletion controller/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "controller"
version = "0.9.3"
version = "0.9.4"
authors = ["<[email protected]>", "<[email protected]>"]
edition = "2018"
rust-version = "1.63.0"
Expand Down
4 changes: 2 additions & 2 deletions deployment/helm/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ type: application
# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
# Versions are expected to follow Semantic Versioning (https://semver.org/)
version: 0.9.3
version: 0.9.4

# This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application. Versions are not expected to
# follow Semantic Versioning. They should reflect the version the application is using.
appVersion: 0.9.3
appVersion: 0.9.4
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "debug-echo-discovery-handler"
version = "0.9.3"
version = "0.9.4"
authors = ["Kate Goldenring <[email protected]>"]
edition = "2018"
rust-version = "1.63.0"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "onvif-discovery-handler"
version = "0.9.3"
version = "0.9.4"
authors = ["Kate Goldenring <[email protected]>"]
edition = "2018"
rust-version = "1.63.0"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "opcua-discovery-handler"
version = "0.9.3"
version = "0.9.4"
authors = ["Kate Goldenring <[email protected]>"]
edition = "2018"
rust-version = "1.63.0"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "udev-discovery-handler"
version = "0.9.3"
version = "0.9.4"
authors = ["Kate Goldenring <[email protected]>"]
edition = "2018"
rust-version = "1.63.0"
Expand Down
2 changes: 1 addition & 1 deletion discovery-handlers/debug-echo/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "akri-debug-echo"
version = "0.9.3"
version = "0.9.4"
authors = ["Kate Goldenring <[email protected]>"]
edition = "2018"
rust-version = "1.63.0"
Expand Down
Loading