Skip to content

Commit

Permalink
Implement short retries in single threaded context
Browse files Browse the repository at this point in the history
Also, fix round retry mechanism not being used for login
This two are merged because the changing of the `next_instance` signature
showed that `login()` did not use the retry mechanism
  • Loading branch information
sosthene-nitrokey committed Sep 16, 2024
1 parent ff3b7de commit bcc41fc
Showing 1 changed file with 121 additions and 47 deletions.
168 changes: 121 additions & 47 deletions pkcs11/src/backend/login.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,30 @@ use std::{
time::Duration,
};

use crate::config::{
config_file::{RetryConfig, UserConfig},
device::{InstanceAttempt, InstanceData, Slot},
use crate::{
config::{
config_file::{RetryConfig, UserConfig},
device::{InstanceAttempt, InstanceData, Slot},
},
data::THREADS_ALLOWED,
};

use super::{ApiError, Error};

#[derive(Debug)]
enum ShouldHealthCheck {
/// The instance is ready to be used
RunDirectly,
/// The instance needs to first be health checked
HealthCheckFirst,
}

impl ShouldHealthCheck {
fn should_check(&self) -> bool {
matches!(self, ShouldHealthCheck::HealthCheckFirst)
}
}

#[derive(Debug)]
pub struct LoginCtx {
slot: Arc<Slot>,
Expand Down Expand Up @@ -71,6 +88,34 @@ impl std::fmt::Display for LoginError {
}
}

/// Perform a health check with a timeout of 1 second
fn health_check_get_timeout(instance: &InstanceData) -> bool {
let config = &instance.config;
let uri_str = format!("{}/health/ready", config.base_path);
let mut req = config.client.get(&uri_str).timeout(Duration::from_secs(1));
if let Some(ref user_agent) = config.user_agent {
req = req.set("user-agent", user_agent);
}

match req.call() {
Ok(r) => {
if r.status() == 200 {
instance.clear_failed();
return true;
}
log::warn!("Failed retry {}", r.status_text());
instance.bump_failed();
false
}

Err(err) => {
log::warn!("Failed retry {err:?}");
instance.bump_failed();
false
}
}
}

impl LoginCtx {
pub fn new(slot: Arc<Slot>, admin_allowed: bool, operator_allowed: bool) -> Self {
let mut ck_state = CKS_RO_PUBLIC_SESSION;
Expand Down Expand Up @@ -113,7 +158,7 @@ impl LoginCtx {
pub fn login(&mut self, user_type: CK_USER_TYPE, pin: String) -> Result<(), LoginError> {
trace!("Login as {:?} with pin", user_type);

let expected = match user_type {
let (user_status, user_mode) = match user_type {
CKU_CONTEXT_SPECIFIC => return Err(LoginError::InvalidUser),
CKU_SO => {
trace!("administrator: {:?}", self.slot.administrator);
Expand All @@ -126,7 +171,7 @@ impl LoginCtx {
}),
};
self.admin_allowed = true;
(UserStatus::Administrator, self.administrator())
(UserStatus::Administrator, UserMode::Administrator)
}
CKU_USER => {
self.operator_login_override = match self.operator_config() {
Expand All @@ -137,69 +182,85 @@ impl LoginCtx {
}),
};
self.operator_allowed = true;
(UserStatus::Operator, self.operator())
(UserStatus::Operator, UserMode::Operator)
}
_ => return Err(LoginError::BadArgument),
};

trace!("Config: {:?}", expected.1);
let got_user = self
.try_(get_current_user_status, user_mode)
.map_err(|err| {
error!("Login check failed: {err:?}");
LoginError::UserNotPresent
})?;

let config = expected.1.ok_or(LoginError::UserNotPresent)?.config;

if get_current_user_status(&config) == expected.0 {
self.ck_state = match expected.0 {
if got_user == user_status {
self.ck_state = match user_status {
UserStatus::Operator => CKS_RW_USER_FUNCTIONS,
UserStatus::Administrator => CKS_RW_SO_FUNCTIONS,
UserStatus::LoggedOut => CKS_RO_PUBLIC_SESSION,
};
Ok(())
} else {
error!("Failed to login as {:?} with pin", expected.0);
error!("Failed to login as {user_mode:?} with pin, got user {got_user:?}");
Err(LoginError::IncorrectPin)
}
}

fn next_instance(&self) -> &InstanceData {
fn next_instance(&self) -> (&InstanceData, ShouldHealthCheck) {
let threads_allowed = THREADS_ALLOWED.load(Relaxed);
let index = self.slot.instance_balancer.fetch_add(1, Relaxed);
let index = index % self.slot.instances.len();
let instance = &self.slot.instances[index];
match instance.should_try() {
InstanceAttempt::Failed => {}
InstanceAttempt::Working | InstanceAttempt::Retry => return instance,
match (instance.should_try(), threads_allowed) {
(InstanceAttempt::Failed, _) | (InstanceAttempt::Retry, true) => {}
(InstanceAttempt::Working, _) => return (instance, ShouldHealthCheck::RunDirectly),
(InstanceAttempt::Retry, false) => {
return (instance, ShouldHealthCheck::HealthCheckFirst)
}
}
for i in 0..self.slot.instances.len() - 1 {
let instance = &self.slot.instances[index + i];

match instance.should_try() {
InstanceAttempt::Failed => continue,
InstanceAttempt::Working | InstanceAttempt::Retry => {
match (instance.should_try(), threads_allowed) {
(InstanceAttempt::Failed, _) | (InstanceAttempt::Retry, true) => continue,
(InstanceAttempt::Working, _) => {
// This not true round-robin in case of multithreaded acces
// This is degraded mode so best-effort is attempted at best
self.slot.instance_balancer.fetch_add(i, Relaxed);
return (instance, ShouldHealthCheck::RunDirectly);
}
(InstanceAttempt::Retry, false) => {
// This not true round-robin in case of multithreaded acces
// This is degraded mode so best-effort is attempted at best
self.slot.instance_balancer.fetch_add(i, Relaxed);
return instance;
return (instance, ShouldHealthCheck::HealthCheckFirst);
}
}
}

// No instance is valid, return a failed instance for an attempt
let index = self.slot.instance_balancer.fetch_add(1, Relaxed);
let index = index % self.slot.instances.len();
&self.slot.instances[index]
// Instance is not valid, don't try health check, it would only slow things down
(&self.slot.instances[index], ShouldHealthCheck::RunDirectly)
}

fn operator(&self) -> Option<InstanceData> {
get_user_api_config(self.operator_config(), self.next_instance())
fn operator(&self) -> Option<(InstanceData, ShouldHealthCheck)> {
let (instance, should_health_check) = self.next_instance();
get_user_api_config(self.operator_config(), instance).map(|c| (c, should_health_check))
}

fn administrator(&self) -> Option<InstanceData> {
get_user_api_config(self.admin_config(), self.next_instance())
fn administrator(&self) -> Option<(InstanceData, ShouldHealthCheck)> {
let (instance, should_health_check) = self.next_instance();
get_user_api_config(self.admin_config(), instance).map(|c| (c, should_health_check))
}

fn operator_or_administrator(&self) -> Option<InstanceData> {
fn operator_or_administrator(&self) -> Option<(InstanceData, ShouldHealthCheck)> {
self.operator().or_else(|| self.administrator())
}

fn guest(&self) -> &InstanceData {
fn guest(&self) -> (&InstanceData, ShouldHealthCheck) {
self.next_instance()
}

Expand All @@ -225,11 +286,17 @@ impl LoginCtx {
self.ck_state = CKS_RO_PUBLIC_SESSION;
}

pub fn get_config_user_mode(&self, user_mode: &UserMode) -> Option<InstanceData> {
fn get_config_user_mode(
&self,
user_mode: &UserMode,
) -> Option<(InstanceData, ShouldHealthCheck)> {
match user_mode {
UserMode::Operator => self.operator(),
UserMode::Administrator => self.administrator(),
UserMode::Guest => Some(self.guest().clone()),
UserMode::Guest => {
let (instance, should_health_check) = self.guest();
Some((instance.clone(), should_health_check))
}
UserMode::OperatorOrAdministrator => self.operator_or_administrator(),
}
}
Expand All @@ -240,7 +307,8 @@ impl LoginCtx {
F: FnOnce(&Configuration) -> Result<R, apis::Error<T>> + Clone,
{
// we loop for a maximum of instances.len() times
let Some(mut instance) = self.get_config_user_mode(&user_mode) else {
let Some((mut instance, mut should_health_check)) = self.get_config_user_mode(&user_mode)
else {
return Err(Error::Login(LoginError::UserNotPresent));
};

Expand All @@ -262,6 +330,12 @@ impl LoginCtx {
);
return Err(ApiError::InstanceRemoved.into());
}

if should_health_check.should_check() && !health_check_get_timeout(&instance) {
// Instance is not valid, we try the next one
continue;
}

retry_count += 1;
let api_call_clone = api_call.clone();
match api_call_clone(&instance.config) {
Expand All @@ -280,8 +354,11 @@ impl LoginCtx {

warn!("Connection attempt {retry_count} failed: Status error connecting to the instance, {:?}, retrying in {delay_seconds}s", err.status);
thread::sleep(delay);
if let Some(new_conf) = self.get_config_user_mode(&user_mode) {
instance = new_conf;
if let Some((new_instance, new_should_health_check)) =
self.get_config_user_mode(&user_mode)
{
instance = new_instance;
should_health_check = new_should_health_check;
}
}

Expand All @@ -296,8 +373,11 @@ impl LoginCtx {
instance.bump_failed();
warn!("Connection attempt {retry_count} failed: IO error connecting to the instance, {err}, retrying in {delay_seconds}s");
thread::sleep(delay);
if let Some(new_conf) = self.get_config_user_mode(&user_mode) {
instance = new_conf;
if let Some((new_instance, new_should_health_check)) =
self.get_config_user_mode(&user_mode)
{
instance = new_instance;
should_health_check = new_should_health_check;
}
}
// Otherwise, return the error
Expand Down Expand Up @@ -349,7 +429,7 @@ impl LoginCtx {
}
}

#[derive(Clone, Debug, PartialEq)]
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum UserMode {
Operator,
Administrator,
Expand All @@ -366,29 +446,23 @@ pub enum UserStatus {

pub fn get_current_user_status(
api_config: &nethsm_sdk_rs::apis::configuration::Configuration,
) -> UserStatus {
) -> Result<UserStatus, apis::Error<default_api::UsersUserIdGetError>> {
let auth = match api_config.basic_auth.as_ref() {
Some(auth) => auth,
None => return UserStatus::LoggedOut,
None => return Ok(UserStatus::LoggedOut),
};

if auth.1.is_none() {
return UserStatus::LoggedOut;
return Ok(UserStatus::LoggedOut);
}

let user = match default_api::users_user_id_get(api_config, auth.0.as_str()) {
Ok(user) => user.entity,
Err(err) => {
error!("Failed to get user: {:?}", err);
return UserStatus::LoggedOut;
}
};
let user = default_api::users_user_id_get(api_config, auth.0.as_str())?;

match user.role {
Ok(match user.entity.role {
UserRole::Operator => UserStatus::Operator,
UserRole::Administrator => UserStatus::Administrator,
_ => UserStatus::LoggedOut,
}
})
}
// Check if the user is logged in and then return the configuration to connect as this user
fn get_user_api_config(
Expand Down

0 comments on commit bcc41fc

Please sign in to comment.