Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: enforce user must specify access_key and secret_key using aws auth #11120

Merged
merged 6 commits into from
Jul 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 7 additions & 20 deletions src/connector/src/aws_auth.rs
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we reject it in fe?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But might be ok as a quick fix

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

still discussing with @arkbriar, whether we should continuously support public buckets.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

still discussing with @arkbriar, whether we should continuously support public buckets.

Prefer to keep it. It's up to user's choice, not our fault.

Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use aws_config::default_provider::credentials::DefaultCredentialsChain;
use aws_config::default_provider::region::DefaultRegionChain;
use aws_config::sts::AssumeRoleProvider;
use aws_credential_types::provider::SharedCredentialsProvider;
Expand Down Expand Up @@ -86,24 +85,12 @@ impl AwsAuthProps {
}
}

async fn build_credential_provider(&self) -> anyhow::Result<SharedCredentialsProvider> {
if self.access_key.is_some() && self.secret_key.is_some() {
Ok(SharedCredentialsProvider::new(
aws_credential_types::Credentials::from_keys(
self.access_key.as_ref().unwrap(),
self.secret_key.as_ref().unwrap(),
self.session_token.clone(),
),
))
} else {
let region = self.build_region().await?;
let mut chain = DefaultCredentialsChain::builder().region(region);

if let Some(profile_name) = self.profile.as_ref() {
chain = chain.profile_name(profile_name)
}
Ok(SharedCredentialsProvider::new(chain.build().await))
}
fn build_credential_provider(&self) -> SharedCredentialsProvider {
SharedCredentialsProvider::new(aws_credential_types::Credentials::from_keys(
self.access_key.as_ref().unwrap_or(&"".into()),
self.secret_key.as_ref().unwrap_or(&"".into()),
self.session_token.clone(),
))
}

async fn with_role_provider(
Expand All @@ -127,7 +114,7 @@ impl AwsAuthProps {
pub async fn build_config(&self) -> anyhow::Result<SdkConfig> {
let region = self.build_region().await?;
let credentials_provider = self
.with_role_provider(self.build_credential_provider().await?)
.with_role_provider(self.build_credential_provider())
.await?;
let config_loader = aws_config::from_env()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should change this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the problem is about finding an alternative for from_env.

Important: Using the aws-config crate to configure the SDK is preferred to invoking this builder directly. Using this builder directly won’t pull in any AWS recommended default configuration values.

---- from doc

It is not recommended to build SdkConfig directly.

.region(region)
Expand Down
29 changes: 16 additions & 13 deletions src/connector/src/aws_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::collections::HashMap;
use std::time::Duration;

use aws_config::timeout::TimeoutConfig;
use aws_sdk_s3::error::DisplayErrorContext;
use aws_sdk_s3::{client as s3_client, config as s3_config};
use risingwave_common::error::ErrorCode::InternalError;
use risingwave_common::error::{Result, RwError};
Expand Down Expand Up @@ -117,12 +118,9 @@ pub async fn load_file_descriptor_from_s3(
location: &Url,
config: &AwsAuthProps,
) -> Result<Vec<u8>> {
let bucket = location.domain().ok_or_else(|| {
RwError::from(InternalError(format!(
"Illegal Protobuf schema path {}",
location
)))
})?;
let bucket = location
.domain()
.ok_or_else(|| RwError::from(InternalError(format!("Illegal file path {}", location))))?;
let key = location.path().replace('/', "");
let sdk_config = config.build_config().await?;
let s3_client = s3_client(&sdk_config, Some(default_conn_config()));
Expand All @@ -132,13 +130,18 @@ pub async fn load_file_descriptor_from_s3(
.key(&key)
.send()
.await
.map_err(|e| RwError::from(InternalError(e.to_string())))?;
.map_err(|e| {
RwError::from(InternalError(format!(
"get file {} err:{}",
location,
DisplayErrorContext(e)
)))
})?;

let body = response.body.collect().await.map_err(|e| {
RwError::from(InternalError(format!(
"Read Protobuf schema file from s3 {}",
e
)))
})?;
let body = response
.body
.collect()
.await
.map_err(|e| RwError::from(InternalError(format!("Read file from s3 {}", e))))?;
Ok(body.into_bytes().to_vec())
}
8 changes: 6 additions & 2 deletions src/connector/src/source/filesystem/s3/enumerator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use anyhow::Context;
use anyhow::{anyhow, Context};
use async_trait::async_trait;
use aws_sdk_s3::client::Client;
use aws_sdk_s3::error::DisplayErrorContext;
use itertools::Itertools;

use crate::aws_auth::AwsAuthProps;
Expand Down Expand Up @@ -106,7 +107,10 @@ impl SplitEnumerator for S3SplitEnumerator {
if let Some(continuation_token) = next_continuation_token.take() {
req = req.continuation_token(continuation_token);
}
let mut res = req.send().await?;
let mut res = req
.send()
.await
.map_err(|e| anyhow!(DisplayErrorContext(e)))?;
objects.extend(res.contents.take().unwrap_or_default());
if res.is_truncated() {
next_continuation_token = Some(res.next_continuation_token.unwrap())
Expand Down