Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement short retries in single threaded context #223

Merged
merged 6 commits into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pkcs11/src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub mod verify;
use std::sync::atomic::Ordering;
use std::{ptr::addr_of_mut, sync::Arc};

use crate::config::device::{RetryThreadMessage, RETRY_THREAD};
use crate::{
backend::events::{fetch_slots_state, EventsManager},
data::{self, DEVICE, EVENTS_MANAGER, THREADS_ALLOWED, TOKENS_STATE},
Expand Down Expand Up @@ -110,6 +111,9 @@ pub extern "C" fn C_Finalize(pReserved: CK_VOID_PTR) -> CK_RV {
return cryptoki_sys::CKR_ARGUMENTS_BAD;
}
DEVICE.store(None);
if THREADS_ALLOWED.load(Ordering::Relaxed) {
RETRY_THREAD.send(RetryThreadMessage::Finalize).unwrap();
}
EVENTS_MANAGER.write().unwrap().finalized = true;

cryptoki_sys::CKR_OK
Expand Down
219 changes: 167 additions & 52 deletions pkcs11/src/backend/login.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,36 @@ use std::{
time::Duration,
};

use crate::config::{
config_file::{RetryConfig, UserConfig},
device::{InstanceAttempt, InstanceData, Slot},
use crate::{
config::{
config_file::{RetryConfig, UserConfig},
device::{InstanceAttempt, InstanceData, Slot},
},
data::THREADS_ALLOWED,
};

use super::{ApiError, Error};

#[derive(Debug)]
enum ShouldHealthCheck {
/// The instance is ready to be used
RunDirectly,
/// The instance needs to first be health checked
HealthCheckFirst,
}

impl ShouldHealthCheck {
fn should_check(&self) -> bool {
matches!(self, ShouldHealthCheck::HealthCheckFirst)
}
}

#[derive(Debug, Clone, Copy)]
enum HealthCheck {
Possible,
Avoid,
}

#[derive(Debug)]
pub struct LoginCtx {
slot: Arc<Slot>,
Expand Down Expand Up @@ -71,6 +94,35 @@ impl std::fmt::Display for LoginError {
}
}

/// Perform a health check with a timeout of 1 second
fn health_check_get_timeout(instance: &InstanceData) -> bool {
instance.config.client.clear_pool();
let config = &instance.config;
let uri_str = format!("{}/health/ready", config.base_path);
let mut req = config.client.get(&uri_str).timeout(Duration::from_secs(1));
if let Some(ref user_agent) = config.user_agent {
req = req.set("user-agent", user_agent);
}

match req.call() {
Ok(r) => {
if r.status() == 200 {
instance.clear_failed();
return true;
}
log::warn!("Failed retry {}", r.status_text());
instance.bump_failed();
false
}

Err(err) => {
log::warn!("Failed retry {err:?}");
instance.bump_failed();
false
}
}
}

impl LoginCtx {
pub fn new(slot: Arc<Slot>, admin_allowed: bool, operator_allowed: bool) -> Self {
let mut ck_state = CKS_RO_PUBLIC_SESSION;
Expand Down Expand Up @@ -113,7 +165,7 @@ impl LoginCtx {
pub fn login(&mut self, user_type: CK_USER_TYPE, pin: String) -> Result<(), LoginError> {
trace!("Login as {:?} with pin", user_type);

let expected = match user_type {
let (user_status, user_mode) = match user_type {
CKU_CONTEXT_SPECIFIC => return Err(LoginError::InvalidUser),
CKU_SO => {
trace!("administrator: {:?}", self.slot.administrator);
Expand All @@ -126,7 +178,7 @@ impl LoginCtx {
}),
};
self.admin_allowed = true;
(UserStatus::Administrator, self.administrator())
(UserStatus::Administrator, UserMode::Administrator)
}
CKU_USER => {
self.operator_login_override = match self.operator_config() {
Expand All @@ -137,70 +189,103 @@ impl LoginCtx {
}),
};
self.operator_allowed = true;
(UserStatus::Operator, self.operator())
(UserStatus::Operator, UserMode::Operator)
}
_ => return Err(LoginError::BadArgument),
};

trace!("Config: {:?}", expected.1);
let got_user = self
.try_(get_current_user_status, user_mode)
.map_err(|err| {
error!("Login check failed: {err:?}");
LoginError::UserNotPresent
})?;

let config = expected.1.ok_or(LoginError::UserNotPresent)?.config;

if get_current_user_status(&config) == expected.0 {
self.ck_state = match expected.0 {
if got_user == user_status {
self.ck_state = match user_status {
UserStatus::Operator => CKS_RW_USER_FUNCTIONS,
UserStatus::Administrator => CKS_RW_SO_FUNCTIONS,
UserStatus::LoggedOut => CKS_RO_PUBLIC_SESSION,
};
Ok(())
} else {
error!("Failed to login as {:?} with pin", expected.0);
error!("Failed to login as {user_mode:?} with pin, got user {got_user:?}");
Err(LoginError::IncorrectPin)
}
}

fn next_instance(&self) -> &InstanceData {
fn next_instance(
&self,
accept_health_check: HealthCheck,
) -> (&InstanceData, ShouldHealthCheck) {
let threads_allowed = THREADS_ALLOWED.load(Relaxed);
let index = self.slot.instance_balancer.fetch_add(1, Relaxed);
let index = index % self.slot.instances.len();
let instance = &self.slot.instances[index];
match instance.should_try() {
InstanceAttempt::Failed => {}
InstanceAttempt::Working | InstanceAttempt::Retry => return instance,
match (instance.should_try(), threads_allowed, accept_health_check) {
(InstanceAttempt::Failed, _, _)
| (InstanceAttempt::Retry, true, _)
| (InstanceAttempt::Retry, false, HealthCheck::Avoid) => {}
(InstanceAttempt::Working, _, _) => return (instance, ShouldHealthCheck::RunDirectly),
(InstanceAttempt::Retry, false, HealthCheck::Possible) => {
return (instance, ShouldHealthCheck::HealthCheckFirst)
}
}
for i in 0..self.slot.instances.len() - 1 {
let instance = &self.slot.instances[index + i];

match instance.should_try() {
InstanceAttempt::Failed => continue,
InstanceAttempt::Working | InstanceAttempt::Retry => {
match (instance.should_try(), threads_allowed, accept_health_check) {
(InstanceAttempt::Failed, _, _)
| (InstanceAttempt::Retry, true, _)
| (InstanceAttempt::Retry, false, HealthCheck::Avoid) => continue,
(InstanceAttempt::Working, _, _) => {
// This not true round-robin in case of multithreaded acces
// This is degraded mode so best-effort is attempted at best
self.slot.instance_balancer.fetch_add(i, Relaxed);
return instance;
return (instance, ShouldHealthCheck::RunDirectly);
}
(InstanceAttempt::Retry, false, HealthCheck::Possible) => {
// This not true round-robin in case of multithreaded acces
// This is degraded mode so best-effort is attempted at best
self.slot.instance_balancer.fetch_add(i, Relaxed);
return (instance, ShouldHealthCheck::HealthCheckFirst);
}
}
}

// No instance is valid, return a failed instance for an attempt
let index = self.slot.instance_balancer.fetch_add(1, Relaxed);
let index = index % self.slot.instances.len();
&self.slot.instances[index]
// Instance is not valid, don't try health check, it would only slow things down
(&self.slot.instances[index], ShouldHealthCheck::RunDirectly)
}

fn operator(&self) -> Option<InstanceData> {
get_user_api_config(self.operator_config(), self.next_instance())
fn operator(
&self,
accept_health_check: HealthCheck,
) -> Option<(InstanceData, ShouldHealthCheck)> {
let (instance, should_health_check) = self.next_instance(accept_health_check);
get_user_api_config(self.operator_config(), instance).map(|c| (c, should_health_check))
}

fn administrator(&self) -> Option<InstanceData> {
get_user_api_config(self.admin_config(), self.next_instance())
fn administrator(
&self,
accept_health_check: HealthCheck,
) -> Option<(InstanceData, ShouldHealthCheck)> {
let (instance, should_health_check) = self.next_instance(accept_health_check);
get_user_api_config(self.admin_config(), instance).map(|c| (c, should_health_check))
}

fn operator_or_administrator(&self) -> Option<InstanceData> {
self.operator().or_else(|| self.administrator())
fn operator_or_administrator(
&self,
accept_health_check: HealthCheck,
) -> Option<(InstanceData, ShouldHealthCheck)> {
self.operator(accept_health_check)
.or_else(|| self.administrator(accept_health_check))
}

fn guest(&self) -> &InstanceData {
self.next_instance()
fn guest(&self, accept_health_check: HealthCheck) -> (&InstanceData, ShouldHealthCheck) {
self.next_instance(accept_health_check)
}

pub fn can_run_mode(&self, mode: UserMode) -> bool {
Expand All @@ -225,12 +310,21 @@ impl LoginCtx {
self.ck_state = CKS_RO_PUBLIC_SESSION;
}

pub fn get_config_user_mode(&self, user_mode: &UserMode) -> Option<InstanceData> {
fn get_config_user_mode(
&self,
user_mode: &UserMode,
accept_health_check: HealthCheck,
) -> Option<(InstanceData, ShouldHealthCheck)> {
match user_mode {
UserMode::Operator => self.operator(),
UserMode::Administrator => self.administrator(),
UserMode::Guest => Some(self.guest().clone()),
UserMode::OperatorOrAdministrator => self.operator_or_administrator(),
UserMode::Operator => self.operator(accept_health_check),
UserMode::Administrator => self.administrator(accept_health_check),
UserMode::Guest => {
let (instance, should_health_check) = self.guest(accept_health_check);
Some((instance.clone(), should_health_check))
}
UserMode::OperatorOrAdministrator => {
self.operator_or_administrator(accept_health_check)
}
}
}

Expand All @@ -239,8 +333,11 @@ impl LoginCtx {
where
F: FnOnce(&Configuration) -> Result<R, apis::Error<T>> + Clone,
{
let mut health_check_count = 0;
// we loop for a maximum of instances.len() times
let Some(mut instance) = self.get_config_user_mode(&user_mode) else {
let Some((mut instance, mut should_health_check)) =
self.get_config_user_mode(&user_mode, HealthCheck::Possible)
else {
return Err(Error::Login(LoginError::UserNotPresent));
};

Expand All @@ -256,12 +353,30 @@ impl LoginCtx {
let delay = Duration::from_secs(delay_seconds);

loop {
let accept_health_check = if health_check_count < 3 {
HealthCheck::Possible
} else {
HealthCheck::Avoid
};
if retry_count > retry_limit {
error!(
"Retry count exceeded after {retry_limit} attempts, instance is unreachable"
);
return Err(ApiError::InstanceRemoved.into());
}

if should_health_check.should_check() && !health_check_get_timeout(&instance) {
health_check_count += 1;
// Instance is not valid, we try the next one
if let Some((new_instance, new_should_health_check)) =
self.get_config_user_mode(&user_mode, accept_health_check)
{
instance = new_instance;
should_health_check = new_should_health_check;
}
continue;
}

retry_count += 1;
let api_call_clone = api_call.clone();
match api_call_clone(&instance.config) {
Expand All @@ -280,8 +395,11 @@ impl LoginCtx {

warn!("Connection attempt {retry_count} failed: Status error connecting to the instance, {:?}, retrying in {delay_seconds}s", err.status);
thread::sleep(delay);
if let Some(new_conf) = self.get_config_user_mode(&user_mode) {
instance = new_conf;
if let Some((new_instance, new_should_health_check)) =
self.get_config_user_mode(&user_mode, accept_health_check)
{
instance = new_instance;
should_health_check = new_should_health_check;
}
}

Expand All @@ -296,8 +414,11 @@ impl LoginCtx {
instance.bump_failed();
warn!("Connection attempt {retry_count} failed: IO error connecting to the instance, {err}, retrying in {delay_seconds}s");
thread::sleep(delay);
if let Some(new_conf) = self.get_config_user_mode(&user_mode) {
instance = new_conf;
if let Some((new_instance, new_should_health_check)) =
self.get_config_user_mode(&user_mode, accept_health_check)
{
instance = new_instance;
should_health_check = new_should_health_check;
}
}
// Otherwise, return the error
Expand Down Expand Up @@ -349,7 +470,7 @@ impl LoginCtx {
}
}

#[derive(Clone, Debug, PartialEq)]
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum UserMode {
Operator,
Administrator,
Expand All @@ -366,29 +487,23 @@ pub enum UserStatus {

pub fn get_current_user_status(
api_config: &nethsm_sdk_rs::apis::configuration::Configuration,
) -> UserStatus {
) -> Result<UserStatus, apis::Error<default_api::UsersUserIdGetError>> {
let auth = match api_config.basic_auth.as_ref() {
Some(auth) => auth,
None => return UserStatus::LoggedOut,
None => return Ok(UserStatus::LoggedOut),
};

if auth.1.is_none() {
return UserStatus::LoggedOut;
return Ok(UserStatus::LoggedOut);
}

let user = match default_api::users_user_id_get(api_config, auth.0.as_str()) {
Ok(user) => user.entity,
Err(err) => {
error!("Failed to get user: {:?}", err);
return UserStatus::LoggedOut;
}
};
let user = default_api::users_user_id_get(api_config, auth.0.as_str())?;

match user.role {
Ok(match user.entity.role {
UserRole::Operator => UserStatus::Operator,
UserRole::Administrator => UserStatus::Administrator,
_ => UserStatus::LoggedOut,
}
})
}
// Check if the user is logged in and then return the configuration to connect as this user
fn get_user_api_config(
Expand Down
Loading
Loading