Skip to content

Commit

Permalink
Support tencent cloud COS storage in datafusion-cli (#9734)
Browse files Browse the repository at this point in the history
* Support tencent cloud COS storage

* Fix clippy

* Update docs/source/user-guide/cli.md

---------

Co-authored-by: Andrew Lamb <[email protected]>
  • Loading branch information
harveyyue and alamb committed Mar 25, 2024
1 parent f7e5581 commit ad89ff8
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 24 deletions.
2 changes: 1 addition & 1 deletion datafusion-cli/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ impl SchemaProvider for DynamicFileSchemaProvider {
// Register the store for this URL. Here we don't have access
// to any command options so the only choice is to use an empty collection
match scheme {
"s3" | "oss" => {
"s3" | "oss" | "cos" => {
state = state.add_table_options_extension(AwsOptions::default());
}
"gs" | "gcs" => {
Expand Down
16 changes: 16 additions & 0 deletions datafusion-cli/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,7 @@ mod tests {
let locations = vec![
"s3://bucket/path/file.parquet",
"oss://bucket/path/file.parquet",
"cos://bucket/path/file.parquet",
"gcs://bucket/path/file.parquet",
];
let mut ctx = SessionContext::new();
Expand Down Expand Up @@ -497,6 +498,21 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn create_object_store_table_cos() -> Result<()> {
let access_key_id = "fake_access_key_id";
let secret_access_key = "fake_secret_access_key";
let endpoint = "fake_endpoint";
let location = "cos://bucket/path/file.parquet";

// Should be OK
let sql = format!("CREATE EXTERNAL TABLE test STORED AS PARQUET
OPTIONS('aws.access_key_id' '{access_key_id}', 'aws.secret_access_key' '{secret_access_key}', 'aws.cos.endpoint' '{endpoint}') LOCATION '{location}'");
create_external_table_test(location, &sql).await?;

Ok(())
}

#[tokio::test]
async fn create_object_store_table_gcs() -> Result<()> {
let service_account_path = "fake_service_account_path";
Expand Down
50 changes: 34 additions & 16 deletions datafusion-cli/src/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::any::Any;
use std::fmt::{Debug, Display};
use std::sync::Arc;

use datafusion::common::{config_namespace, exec_datafusion_err, exec_err, internal_err};
use datafusion::common::{exec_datafusion_err, exec_err, internal_err};
use datafusion::error::{DataFusionError, Result};
use datafusion::execution::context::SessionState;
use datafusion::prelude::SessionContext;
Expand Down Expand Up @@ -106,12 +106,27 @@ impl CredentialProvider for S3CredentialProvider {
pub fn get_oss_object_store_builder(
url: &Url,
aws_options: &AwsOptions,
) -> Result<AmazonS3Builder> {
get_object_store_builder(url, aws_options, true)
}

pub fn get_cos_object_store_builder(
url: &Url,
aws_options: &AwsOptions,
) -> Result<AmazonS3Builder> {
get_object_store_builder(url, aws_options, false)
}

fn get_object_store_builder(
url: &Url,
aws_options: &AwsOptions,
virtual_hosted_style_request: bool,
) -> Result<AmazonS3Builder> {
let bucket_name = get_bucket_name(url)?;
let mut builder = AmazonS3Builder::from_env()
.with_virtual_hosted_style_request(true)
.with_virtual_hosted_style_request(virtual_hosted_style_request)
.with_bucket_name(bucket_name)
// oss don't care about the "region" field
// oss/cos don't care about the "region" field
.with_region("do_not_care");

if let (Some(access_key_id), Some(secret_access_key)) =
Expand All @@ -122,7 +137,7 @@ pub fn get_oss_object_store_builder(
.with_secret_access_key(secret_access_key);
}

if let Some(endpoint) = &aws_options.oss.endpoint {
if let Some(endpoint) = &aws_options.endpoint {
builder = builder.with_endpoint(endpoint);
}

Expand Down Expand Up @@ -171,14 +186,8 @@ pub struct AwsOptions {
pub session_token: Option<String>,
/// AWS Region
pub region: Option<String>,
/// Object Storage Service options
pub oss: OssOptions,
}

config_namespace! {
pub struct OssOptions {
pub endpoint: Option<String>, default = None
}
/// OSS or COS Endpoint
pub endpoint: Option<String>,
}

impl ExtensionOptions for AwsOptions {
Expand Down Expand Up @@ -210,8 +219,8 @@ impl ExtensionOptions for AwsOptions {
"region" => {
self.region.set(rem, value)?;
}
"oss" => {
self.oss.set(rem, value)?;
"oss" | "cos" => {
self.endpoint.set(rem, value)?;
}
_ => {
return internal_err!("Config value \"{}\" not found on AwsOptions", rem);
Expand Down Expand Up @@ -252,7 +261,7 @@ impl ExtensionOptions for AwsOptions {
.visit(&mut v, "secret_access_key", "");
self.session_token.visit(&mut v, "session_token", "");
self.region.visit(&mut v, "region", "");
self.oss.visit(&mut v, "oss", "");
self.endpoint.visit(&mut v, "endpoint", "");
v.0
}
}
Expand Down Expand Up @@ -376,7 +385,7 @@ pub(crate) fn register_options(ctx: &SessionContext, scheme: &str) {
// Match the provided scheme against supported cloud storage schemes:
match scheme {
// For Amazon S3 or Alibaba Cloud OSS
"s3" | "oss" => {
"s3" | "oss" | "cos" => {
// Register AWS specific table options in the session context:
ctx.register_table_options_extension(AwsOptions::default())
}
Expand Down Expand Up @@ -415,6 +424,15 @@ pub(crate) async fn get_object_store(
let builder = get_oss_object_store_builder(url, options)?;
Arc::new(builder.build()?)
}
"cos" => {
let Some(options) = table_options.extensions.get::<AwsOptions>() else {
return exec_err!(
"Given table options incompatible with the 'cos' scheme"
);
};
let builder = get_cos_object_store_builder(url, options)?;
Arc::new(builder.build()?)
}
"gs" | "gcs" => {
let Some(options) = table_options.extensions.get::<GcpOptions>() else {
return exec_err!(
Expand Down
37 changes: 30 additions & 7 deletions docs/source/user-guide/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -312,9 +312,9 @@ select count(*) from hits;
CREATE EXTERNAL TABLE test
STORED AS PARQUET
OPTIONS(
'access_key_id' '******',
'secret_access_key' '******',
'region' 'us-east-2'
'aws.access_key_id' '******',
'aws.secret_access_key' '******',
'aws.region' 'us-east-2'
)
LOCATION 's3://bucket/path/file.parquet';
```
Expand Down Expand Up @@ -365,9 +365,9 @@ Details of the environment variables that can be used are:
CREATE EXTERNAL TABLE test
STORED AS PARQUET
OPTIONS(
'access_key_id' '******',
'secret_access_key' '******',
'endpoint' 'https://bucket.oss-cn-hangzhou.aliyuncs.com'
'aws.access_key_id' '******',
'aws.secret_access_key' '******',
'aws.oss.endpoint' 'https://bucket.oss-cn-hangzhou.aliyuncs.com'
)
LOCATION 'oss://bucket/path/file.parquet';
```
Expand All @@ -380,6 +380,29 @@ The supported OPTIONS are:
Note that the `endpoint` format of oss needs to be: `https://{bucket}.{oss-region-endpoint}`
## Registering COS Data Sources
[Tencent cloud COS](https://cloud.tencent.com/product/cos) data sources can be registered by executing a `CREATE EXTERNAL TABLE` SQL statement.
```sql
CREATE EXTERNAL TABLE test
STORED AS PARQUET
OPTIONS(
'aws.access_key_id' '******',
'aws.secret_access_key' '******',
'aws.cos.endpoint' 'https://cos.ap-singapore.myqcloud.com'
)
LOCATION 'cos://bucket/path/file.parquet';
```
The supported OPTIONS are:
- access_key_id
- secret_access_key
- endpoint
Note that the `endpoint` format of urls must be: `https://cos.{cos-region-endpoint}`
## Registering GCS Data Sources
[Google Cloud Storage](https://cloud.google.com/storage) data sources can be registered by executing a `CREATE EXTERNAL TABLE` SQL statement.
Expand All @@ -388,7 +411,7 @@ Note that the `endpoint` format of oss needs to be: `https://{bucket}.{oss-regio
CREATE EXTERNAL TABLE test
STORED AS PARQUET
OPTIONS(
'service_account_path' '/tmp/gcs.json',
'gcp.service_account_path' '/tmp/gcs.json',
)
LOCATION 'gs://bucket/path/file.parquet';
```
Expand Down

0 comments on commit ad89ff8

Please sign in to comment.