Skip to content

feat: support connection in stage related infer_schema #13890

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Dec 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .github/actions/test_sqllogic_stage/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ runs:
target: ${{ inputs.target }}
artifacts: sqllogictests,meta,query
- name: Minio Setup for (ubuntu-latest only)
if: inputs.storage == 's3'
shell: bash
run: |
docker run -d --network host --name minio \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,11 @@
// limitations under the License.

use std::any::Any;
use std::collections::BTreeMap;
use std::sync::Arc;

use common_ast::ast::FileLocation;
use common_ast::ast::UriLocation;
use common_catalog::plan::DataSourcePlan;
use common_catalog::plan::PartStatistics;
use common_catalog::plan::Partitions;
Expand All @@ -41,11 +44,12 @@ use common_pipeline_core::processors::ProcessorPtr;
use common_pipeline_core::Pipeline;
use common_pipeline_sources::AsyncSource;
use common_pipeline_sources::AsyncSourcer;
use common_sql::binder::resolve_stage_location;
use common_sql::binder::resolve_file_location;
use common_storage::init_stage_operator;
use common_storage::read_parquet_schema_async;
use common_storage::read_parquet_schema_async_rs;
use common_storage::StageFilesInfo;
use opendal::Scheme;

use crate::pipelines::processors::OutputPort;
use crate::sessions::TableContext;
Expand Down Expand Up @@ -179,8 +183,34 @@ impl AsyncSource for InferSchemaSource {
}
self.is_finished = true;

let (stage_info, path) =
resolve_stage_location(&self.ctx, &self.args_parsed.location).await?;
let file_location = if let Some(location) =
self.args_parsed.location.clone().strip_prefix('@')
{
FileLocation::Stage(location.to_string())
} else if let Some(connection_name) = &self.args_parsed.connection_name {
let conn = self.ctx.get_connection(connection_name).await?;
let uri = UriLocation::from_uri(
self.args_parsed.location.clone(),
"".to_string(),
conn.storage_params,
)?;
let proto = conn.storage_type.parse::<Scheme>()?;
if proto != uri.protocol.parse::<Scheme>()? {
return Err(ErrorCode::BadArguments(format!(
"protocol from connection_name={connection_name} ({proto}) not match with uri protocol ({0}).",
uri.protocol
)));
}
FileLocation::Uri(uri)
} else {
let uri = UriLocation::from_uri(
self.args_parsed.location.clone(),
"".to_string(),
BTreeMap::default(),
)?;
FileLocation::Uri(uri)
};
let (stage_info, path) = resolve_file_location(&self.ctx, &file_location).await?;
let enable_experimental_rbac_check = self
.ctx
.get_settings()
Expand Down
16 changes: 7 additions & 9 deletions src/query/service/src/table_functions/infer_schema/table_args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use common_storages_fuse::table_functions::string_value;
#[derive(Clone)]
pub(crate) struct InferSchemaArgsParsed {
pub(crate) location: String,
pub(crate) connection_name: Option<String>,
pub(crate) file_format: Option<String>,
pub(crate) files_info: StageFilesInfo,
}
Expand All @@ -30,6 +31,7 @@ impl InferSchemaArgsParsed {
let args = table_args.expect_all_named("infer_schema")?;

let mut location = None;
let mut connection_name = None;
let mut file_format = None;
let mut files_info = StageFilesInfo {
path: "".to_string(),
Expand All @@ -40,15 +42,10 @@ impl InferSchemaArgsParsed {
for (k, v) in &args {
match k.to_lowercase().as_str() {
"location" => {
let v = string_value(v)?;
if let Some(name) = v.strip_prefix('@') {
location = Some(name.to_string());
} else {
return Err(ErrorCode::BadArguments(format!(
"location must start with @, but got {}",
v
)));
}
location = Some(string_value(v)?);
}
"connection_name" => {
connection_name = Some(string_value(v)?);
}
"pattern" => {
files_info.pattern = Some(string_value(v)?);
Expand All @@ -71,6 +68,7 @@ impl InferSchemaArgsParsed {

Ok(Self {
location,
connection_name,
file_format,
files_info,
})
Expand Down
1 change: 1 addition & 0 deletions src/query/sql/src/planner/binder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ pub use binder::Binder;
pub use builders::*;
pub use column_binding::ColumnBinding;
pub use column_binding::ColumnBindingBuilder;
pub use copy_into_table::resolve_file_location;
pub use copy_into_table::resolve_stage_location;
pub use internal_column_factory::INTERNAL_COLUMN_FACTORY;
pub use location::parse_uri_location;
Expand Down
12 changes: 12 additions & 0 deletions tests/sqllogictests/suites/stage/formats/parquet/infer_schema.test
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,15 @@ select * from infer_schema(location => '@data/parquet/', FILE_FORMAT => 'PARQUET
----
id INT 0 0
t TUPLE(A INT32, B STRING) 0 1

statement ok
drop CONNECTION IF EXISTS my_conn

statement ok
create CONNECTION my_conn STORAGE_TYPE = 's3' access_key_id='minioadmin' secret_access_key='minioadmin' endpoint_url='http://127.0.0.1:9900/'

query
select * from INFER_SCHEMA(location => 's3://testbucket/data/parquet/tuple.parquet', connection_name => 'my_conn')
----
id INT 0 0
t TUPLE(A INT32, B STRING) 0 1