Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove all need for workers to know about ActionId #1125

Merged
merged 1 commit into from
Jul 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,10 @@ message ConnectionResult {
reserved 2; // NextId.
}

/// Request to kill a running action sent from the scheduler to a worker.
message KillActionRequest {
/// The the hex encoded unique qualifier for the action to be killed.
string action_id = 1;
/// Request to kill a running operation sent from the scheduler to a worker.
message KillOperationRequest {
/// The the operation id for the operation to be killed.
string operation_id = 1;
reserved 2; // NextId.
}
/// Communication from the scheduler to the worker.
Expand All @@ -152,8 +152,8 @@ message UpdateForWorker {
/// The worker may discard any outstanding work that is being executed.
google.protobuf.Empty disconnect = 4;

/// Instructs the worker to kill a specific running action.
KillActionRequest kill_action_request = 5;
/// Instructs the worker to kill a specific running operation.
KillOperationRequest kill_operation_request = 5;
}
reserved 6; // NextId.
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,13 @@ pub struct ConnectionResult {
#[prost(string, tag = "1")]
pub worker_id: ::prost::alloc::string::String,
}
/// / Request to kill a running action sent from the scheduler to a worker.
/// / Request to kill a running operation sent from the scheduler to a worker.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct KillActionRequest {
/// / The the hex encoded unique qualifier for the action to be killed.
pub struct KillOperationRequest {
/// / The the operation id for the operation to be killed.
#[prost(string, tag = "1")]
pub action_id: ::prost::alloc::string::String,
pub operation_id: ::prost::alloc::string::String,
}
/// / Communication from the scheduler to the worker.
#[allow(clippy::derive_partial_eq_without_eq)]
Expand Down Expand Up @@ -133,9 +133,9 @@ pub mod update_for_worker {
/// / The worker may discard any outstanding work that is being executed.
#[prost(message, tag = "4")]
Disconnect(()),
/// / Instructs the worker to kill a specific running action.
/// / Instructs the worker to kill a specific running operation.
#[prost(message, tag = "5")]
KillActionRequest(super::KillActionRequest),
KillOperationRequest(super::KillOperationRequest),
}
}
#[allow(clippy::derive_partial_eq_without_eq)]
Expand Down
4 changes: 2 additions & 2 deletions nativelink-scheduler/tests/simple_scheduler_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ fn update_eq(expected: UpdateForWorker, actual: UpdateForWorker, ignore_id: bool
}
_ => false,
},
update_for_worker::Update::KillActionRequest(actual_update) => match expected_update {
update_for_worker::Update::KillActionRequest(expected_update) => {
update_for_worker::Update::KillOperationRequest(actual_update) => match expected_update {
update_for_worker::Update::KillOperationRequest(expected_update) => {
expected_update == actual_update
}
_ => false,
Expand Down
33 changes: 21 additions & 12 deletions nativelink-worker/src/local_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,20 +211,29 @@ impl<'a, T: WorkerApiClientTrait, U: RunningActionsManager> LocalWorkerImpl<'a,
Update::KeepAlive(()) => {
self.metrics.keep_alives_received.inc();
}
Update::KillActionRequest(kill_action_request) => {
let mut action_id = [0u8; 32];
hex::decode_to_slice(kill_action_request.action_id, &mut action_id as &mut [u8])
.map_err(|e| make_input_err!(
"KillActionRequest failed to decode ActionId hex with error {}",
e
))?;

if let Err(err) = self.running_actions_manager.kill_action(action_id).await {
Update::KillOperationRequest(kill_operation_request) => {
let operation_id_res = kill_operation_request
.operation_id
.as_str()
.try_into();
let operation_id = match operation_id_res {
Ok(operation_id) => operation_id,
Err(err) => {
event!(
Level::ERROR,
?kill_operation_request,
?err,
"Failed to convert string to operation_id"
);
continue;
}
};
if let Err(err) = self.running_actions_manager.kill_operation(&operation_id).await {
event!(
Level::ERROR,
action_id = hex::encode(action_id),
?operation_id,
?err,
"Failed to send kill request for action"
"Failed to send kill request for operation"
);
};
}
Expand Down Expand Up @@ -257,7 +266,7 @@ impl<'a, T: WorkerApiClientTrait, U: RunningActionsManager> LocalWorkerImpl<'a,
.and_then(|action| {
event!(
Level::INFO,
action_id = hex::encode(action.get_action_id()),
operation_id = ?action.get_operation_id(),
"Received request to run action"
);
action
Expand Down
Loading