Skip to content

Commit

Permalink
Add pool_idle_timeout options for s3 and sts clients (#458)
Browse files Browse the repository at this point in the history
  • Loading branch information
mosyp authored Oct 12, 2021
1 parent 2800078 commit 8d12ad8
Showing 1 changed file with 70 additions and 17 deletions.
87 changes: 70 additions & 17 deletions rust/src/storage/s3/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::{fmt, pin::Pin};
use chrono::{DateTime, FixedOffset, Utc};
use futures::Stream;
use log::debug;
use rusoto_core::{HttpClient, Region, RusotoError};
use rusoto_core::{HttpClient, HttpConfig, Region, RusotoError};
use rusoto_credential::AutoRefreshingProvider;
use rusoto_s3::{
CopyObjectRequest, Delete, DeleteObjectRequest, DeleteObjectsRequest, GetObjectRequest,
Expand All @@ -19,7 +19,10 @@ use serde::{Deserialize, Serialize};
use tokio::io::AsyncReadExt;

use super::{parse_uri, ObjectMeta, StorageBackend, StorageError};
use rusoto_core::credential::{AwsCredentials, CredentialsError, ProvideAwsCredentials};
use rusoto_core::credential::{
AwsCredentials, CredentialsError, DefaultCredentialsProvider, ProvideAwsCredentials,
};
use std::time::Duration;
use uuid::Uuid;

pub mod dynamodb_lock;
Expand All @@ -40,7 +43,14 @@ pub mod s3_storage_options {
pub const AWS_S3_ASSUME_ROLE_ARN: &str = "AWS_S3_ASSUME_ROLE_ARN";
/// The role session name to use when a role is assumed. If not provided a random session name is generated.
pub const AWS_S3_ROLE_SESSION_NAME: &str = "AWS_S3_ROLE_SESSION_NAME";

/// The `pool_idle_timeout` option of aws http client. Has to be lower than 20 seconds, which is
/// default S3 server timeout https://aws.amazon.com/premiumsupport/knowledge-center/s3-socket-connection-timeout-error/.
/// However, since rusoto uses hyper as a client, its default timeout is 90 seconds https://docs.rs/hyper/0.13.2/hyper/client/struct.Builder.html#method.keep_alive_timeout.
/// Hence, the `connection closed before message completed` could occur.
/// To avoid that, the default value of this setting is 15 seconds if it's not set otherwise.
pub const AWS_S3_POOL_IDLE_TIMEOUT_SECONDS: &str = "AWS_S3_POOL_IDLE_TIMEOUT_SECONDS";
/// The `pool_idle_timeout` for the aws sts client. See the reasoning in `AWS_S3_POOL_IDLE_TIMEOUT_SECONDS`.
pub const AWS_STS_POOL_IDLE_TIMEOUT_SECONDS: &str = "AWS_STS_POOL_IDLE_TIMEOUT_SECONDS";
/// The web identity token file to use when using a web identity provider.
/// NOTE: web identity related options are set in the environment when creating an instance of [crate::storage::s3::S3StorageOptions].
/// See also https://docs.rs/rusoto_sts/0.47.0/rusoto_sts/struct.WebIdentityProvider.html#method.from_k8s_env.
Expand All @@ -66,6 +76,8 @@ pub mod s3_storage_options {
AWS_WEB_IDENTITY_TOKEN_FILE,
AWS_ROLE_ARN,
AWS_ROLE_SESSION_NAME,
AWS_S3_POOL_IDLE_TIMEOUT_SECONDS,
AWS_STS_POOL_IDLE_TIMEOUT_SECONDS,
];
}

Expand All @@ -80,6 +92,8 @@ pub struct S3StorageOptions {
assume_role_arn: Option<String>,
assume_role_session_name: Option<String>,
use_web_identity: bool,
s3_pool_idle_timeout: Duration,
sts_pool_idle_timeout: Duration,
extra_opts: HashMap<String, String>,
}

Expand Down Expand Up @@ -112,6 +126,17 @@ impl S3StorageOptions {
Self::ensure_env_var(&options, s3_storage_options::AWS_ROLE_ARN);
Self::ensure_env_var(&options, s3_storage_options::AWS_ROLE_SESSION_NAME);

let s3_pool_idle_timeout = Self::u64_or_default(
&options,
s3_storage_options::AWS_S3_POOL_IDLE_TIMEOUT_SECONDS,
15,
);
let sts_pool_idle_timeout = Self::u64_or_default(
&options,
s3_storage_options::AWS_STS_POOL_IDLE_TIMEOUT_SECONDS,
10,
);

Self {
_endpoint_url: endpoint_url,
region,
Expand All @@ -126,6 +151,8 @@ impl S3StorageOptions {
),
use_web_identity: std::env::var(s3_storage_options::AWS_WEB_IDENTITY_TOKEN_FILE)
.is_ok(),
s3_pool_idle_timeout: Duration::from_secs(s3_pool_idle_timeout),
sts_pool_idle_timeout: Duration::from_secs(sts_pool_idle_timeout),
extra_opts,
}
}
Expand All @@ -141,6 +168,12 @@ impl S3StorageOptions {
.map_or_else(|| std::env::var(key).ok(), |v| Some(v.to_owned()))
}

fn u64_or_default(map: &HashMap<String, String>, key: &str, default: u64) -> u64 {
Self::str_option(map, key)
.and_then(|v| v.parse().ok())
.unwrap_or(default)
}

fn ensure_env_var(map: &HashMap<String, String>, key: &str) {
if let Some(val) = Self::str_option(map, key) {
std::env::set_var(key, val);
Expand Down Expand Up @@ -207,19 +240,18 @@ impl From<RusotoError<rusoto_s3::CopyObjectError>> for StorageError {
/// The extension of StsAssumeRoleSessionCredentialsProvider in order to provide new session_name
/// on each credentials refresh.
struct AssumeRoleCredentialsProvider {
sts_client: StsClient,
assume_role_arn: String,
region: Region,
session_name: Option<String>,
}

#[async_trait::async_trait]
impl ProvideAwsCredentials for AssumeRoleCredentialsProvider {
async fn credentials(&self) -> Result<AwsCredentials, CredentialsError> {
let sts_client = StsClient::new(self.region.clone());
let session_name = self.session_name.as_deref().unwrap_or("delta-rs");
let session_name = format!("{}-{}", session_name, Uuid::new_v4());
let provider = StsAssumeRoleSessionCredentialsProvider::new(
sts_client,
self.sts_client.clone(),
self.assume_role_arn.clone(),
session_name,
None,
Expand All @@ -235,38 +267,48 @@ fn get_sts_assume_role_provider(
assume_role_arn: String,
options: &S3StorageOptions,
) -> Result<AutoRefreshingProvider<AssumeRoleCredentialsProvider>, StorageError> {
let sts_client = StsClient::new_with(
create_http_client(options.sts_pool_idle_timeout)?,
DefaultCredentialsProvider::new()?,
options.region.clone(),
);

let provider = AssumeRoleCredentialsProvider {
sts_client,
assume_role_arn,
region: options.region.clone(),
session_name: options.assume_role_session_name.clone(),
};

Ok(AutoRefreshingProvider::new(provider)?)
}

fn create_http_client(pool_idle_timeout: Duration) -> Result<HttpClient, StorageError> {
let mut config = HttpConfig::new();
config.pool_idle_timeout(pool_idle_timeout);
Ok(HttpClient::new_with_config(config)?)
}

fn get_web_identity_provider() -> Result<AutoRefreshingProvider<WebIdentityProvider>, StorageError>
{
let provider = WebIdentityProvider::from_k8s_env();
Ok(AutoRefreshingProvider::new(provider)?)
}

fn create_s3_client(options: &S3StorageOptions) -> Result<S3Client, StorageError> {
let http_client = create_http_client(options.s3_pool_idle_timeout)?;
let region = options.region.clone();
if options.use_web_identity {
let provider = get_web_identity_provider()?;
Ok(S3Client::new_with(
HttpClient::new()?,
provider,
options.region.clone(),
))
Ok(S3Client::new_with(http_client, provider, region))
} else if let Some(assume_role_arn) = &options.assume_role_arn {
let provider = get_sts_assume_role_provider(assume_role_arn.to_owned(), options)?;
Ok(S3Client::new_with(http_client, provider, region))
} else {
Ok(S3Client::new_with(
HttpClient::new()?,
provider,
options.region.clone(),
http_client,
DefaultCredentialsProvider::new()?,
region,
))
} else {
Ok(S3Client::new(options.region.clone()))
}
}

Expand Down Expand Up @@ -878,6 +920,8 @@ mod tests {
assume_role_session_name: Some("session_name".to_string()),
use_web_identity: true,
locking_provider: Some("dynamodb".to_string()),
s3_pool_idle_timeout: Duration::from_secs(15),
sts_pool_idle_timeout: Duration::from_secs(10),
extra_opts: HashMap::new(),
},
options
Expand All @@ -893,6 +937,8 @@ mod tests {
s3_storage_options::AWS_S3_ASSUME_ROLE_ARN.to_string() => "arn:aws:iam::123456789012:role/another_role".to_string(),
s3_storage_options::AWS_S3_ROLE_SESSION_NAME.to_string() => "another_session_name".to_string(),
s3_storage_options::AWS_WEB_IDENTITY_TOKEN_FILE.to_string() => "another_token_file".to_string(),
s3_storage_options::AWS_S3_POOL_IDLE_TIMEOUT_SECONDS.to_string() => "1".to_string(),
s3_storage_options::AWS_STS_POOL_IDLE_TIMEOUT_SECONDS.to_string() => "2".to_string(),
});

assert_eq!(
Expand All @@ -906,6 +952,8 @@ mod tests {
assume_role_session_name: Some("another_session_name".to_string()),
use_web_identity: true,
locking_provider: Some("another_locking_provider".to_string()),
s3_pool_idle_timeout: Duration::from_secs(1),
sts_pool_idle_timeout: Duration::from_secs(2),
extra_opts: HashMap::new(),
},
options
Expand All @@ -927,6 +975,9 @@ mod tests {
"token_file",
);

std::env::set_var(s3_storage_options::AWS_S3_POOL_IDLE_TIMEOUT_SECONDS, "1");
std::env::set_var(s3_storage_options::AWS_STS_POOL_IDLE_TIMEOUT_SECONDS, "2");

let options = S3StorageOptions::from_map(hashmap! {
s3_storage_options::AWS_REGION.to_string() => "us-west-2".to_string(),
"DYNAMO_LOCK_PARTITION_KEY_VALUE".to_string() => "my_lock".to_string(),
Expand All @@ -943,6 +994,8 @@ mod tests {
assume_role_session_name: Some("session_name".to_string()),
use_web_identity: true,
locking_provider: Some("dynamodb".to_string()),
s3_pool_idle_timeout: Duration::from_secs(1),
sts_pool_idle_timeout: Duration::from_secs(2),
extra_opts: hashmap! {
"DYNAMO_LOCK_PARTITION_KEY_VALUE".to_string() => "my_lock".to_string(),
},
Expand Down

0 comments on commit 8d12ad8

Please sign in to comment.