Skip to content

Commit

Permalink
feat: copy option 'pattern' support variable.
Browse files Browse the repository at this point in the history
  • Loading branch information
youngsofun committed Sep 26, 2024
1 parent bb92fdc commit 6fe9c16
Show file tree
Hide file tree
Showing 7 changed files with 102 additions and 18 deletions.
7 changes: 4 additions & 3 deletions src/query/ast/src/ast/statements/copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use crate::ast::quote::QuotedString;
use crate::ast::write_comma_separated_map;
use crate::ast::write_comma_separated_string_list;
use crate::ast::write_comma_separated_string_map;
use crate::ast::Expr;
use crate::ast::Hint;
use crate::ast::Identifier;
use crate::ast::Query;
Expand Down Expand Up @@ -54,7 +55,7 @@ pub struct CopyIntoTableStmt {

// files to load
pub files: Option<Vec<String>>,
pub pattern: Option<String>,
pub pattern: Option<Expr>,
pub force: bool,

// copy options
Expand Down Expand Up @@ -110,7 +111,7 @@ impl Display for CopyIntoTableStmt {
}

if let Some(pattern) = &self.pattern {
write!(f, " PATTERN = '{}'", pattern)?;
write!(f, " PATTERN = {}", pattern)?;
}

if !self.file_format.is_empty() {
Expand Down Expand Up @@ -440,7 +441,7 @@ impl Display for FileLocation {

pub enum CopyIntoTableOption {
Files(Vec<String>),
Pattern(String),
Pattern(Expr),
FileFormat(FileFormatOptions),
ValidationMode(String),
SizeLimit(usize),
Expand Down
11 changes: 6 additions & 5 deletions src/query/ast/src/ast/statements/stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use derive_visitor::DriveMut;
use crate::ast::write_comma_separated_string_list;
use crate::ast::write_comma_separated_string_map;
use crate::ast::CreateOption;
use crate::ast::Expr;
use crate::ast::FileFormatOptions;
use crate::ast::UriLocation;

Expand Down Expand Up @@ -80,10 +81,10 @@ impl Display for CreateStageStmt {
}
}

#[derive(Debug, Clone, PartialEq, Eq, Drive, DriveMut)]
#[derive(Debug, Clone, PartialEq, Drive, DriveMut)]
pub enum SelectStageOption {
Files(Vec<String>),
Pattern(String),
Pattern(Expr),
FileFormat(String),
Connection(BTreeMap<String, String>),
}
Expand All @@ -103,10 +104,10 @@ impl SelectStageOptions {
}
}

#[derive(Debug, Clone, PartialEq, Eq, Default, Drive, DriveMut)]
#[derive(Debug, Clone, PartialEq, Default, Drive, DriveMut)]
pub struct SelectStageOptions {
pub files: Option<Vec<String>>,
pub pattern: Option<String>,
pub pattern: Option<Expr>,
pub file_format: Option<String>,
pub connection: BTreeMap<String, String>,
}
Expand Down Expand Up @@ -150,7 +151,7 @@ impl Display for SelectStageOptions {
}

if let Some(pattern) = self.pattern.as_ref() {
write!(f, " PATTERN => '{}',", pattern)?;
write!(f, " PATTERN => {},", pattern)?;
}

if !self.connection.is_empty() {
Expand Down
8 changes: 4 additions & 4 deletions src/query/ast/src/parser/copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use crate::parser::common::ident;
use crate::parser::common::table_ref;
use crate::parser::common::IResult;
use crate::parser::common::*;
use crate::parser::expr::expr;
use crate::parser::expr::literal_bool;
use crate::parser::expr::literal_string;
use crate::parser::expr::literal_u64;
Expand Down Expand Up @@ -144,10 +145,9 @@ fn copy_into_table_option(i: Input) -> IResult<CopyIntoTableOption> {
rule! { FILES ~ "=" ~ "(" ~ #comma_separated_list0(literal_string) ~ ")" },
|(_, _, _, files, _)| CopyIntoTableOption::Files(files),
),
map(
rule! { PATTERN ~ "=" ~ #literal_string },
|(_, _, pattern)| CopyIntoTableOption::Pattern(pattern),
),
map(rule! { PATTERN ~ "=" ~ #expr }, |(_, _, pattern)| {
CopyIntoTableOption::Pattern(pattern)
}),
map(rule! { #file_format_clause }, |options| {
CopyIntoTableOption::FileFormat(options)
}),
Expand Down
7 changes: 3 additions & 4 deletions src/query/ast/src/parser/stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,10 +255,9 @@ pub fn select_stage_option(i: Input) -> IResult<SelectStageOption> {
rule! { FILES ~ ^"=>" ~ ^"(" ~ ^#comma_separated_list0(literal_string) ~ ^")" },
|(_, _, _, files, _)| SelectStageOption::Files(files),
),
map(
rule! { PATTERN ~ ^"=>" ~ ^#literal_string },
|(_, _, pattern)| SelectStageOption::Pattern(pattern),
),
map(rule! { PATTERN ~ ^"=>" ~ ^#expr }, |(_, _, pattern)| {
SelectStageOption::Pattern(pattern)
}),
map(
rule! { FILE_FORMAT ~ ^"=>" ~ ^#literal_string },
|(_, _, file_format)| SelectStageOption::FileFormat(file_format),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,14 @@ impl Binder {
_ => databend_common_base::runtime::block_on(self.ctx.get_file_format(f))?,
}
}
let pattern = match &options.pattern {
None => None,
Some(pattern) => Some(Self::resolve_copy_pattern(self.ctx.clone(), pattern)?),
};

let files_info = StageFilesInfo {
path,
pattern: options.pattern.clone(),
pattern,
files: options.files.clone(),
};
let table_ctx = self.ctx.clone();
Expand Down
49 changes: 48 additions & 1 deletion src/query/sql/src/planner/binder/copy_into_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ use databend_common_meta_app::principal::NullAs;
use databend_common_meta_app::principal::OnErrorMode;
use databend_common_meta_app::principal::StageInfo;
use databend_common_meta_app::principal::COPY_MAX_FILES_PER_COMMIT;
use databend_common_meta_app::tenant::Tenant;
use databend_common_settings::Settings;
use databend_common_storage::StageFilesInfo;
use databend_common_users::UserApiProvider;
use derive_visitor::Drive;
Expand All @@ -65,6 +67,7 @@ use parking_lot::RwLock;
use crate::binder::bind_query::MaxColumnPosition;
use crate::binder::location::parse_uri_location;
use crate::binder::Binder;
use crate::plans::ConstantExpr;
use crate::plans::CopyIntoTableMode;
use crate::plans::CopyIntoTablePlan;
use crate::plans::Plan;
Expand All @@ -73,6 +76,7 @@ use crate::BindContext;
use crate::Metadata;
use crate::NameResolutionContext;
use crate::ScalarBinder;
use crate::TypeChecker;

impl<'a> Binder {
#[async_backtrace::framed]
Expand Down Expand Up @@ -111,6 +115,44 @@ impl<'a> Binder {
}
}

fn resolve_const_expr(ctx: Arc<dyn TableContext>, expr: &Expr) -> Result<Scalar> {
let settings = Settings::create(Tenant::new_literal("dummy"));
let mut bind_context = BindContext::new();
let metadata = Metadata::default();

let name_resolution_ctx = NameResolutionContext::try_from(settings.as_ref())?;
let mut type_checker = TypeChecker::try_create(
&mut bind_context,
ctx.clone(),
&name_resolution_ctx,
Arc::new(RwLock::new(metadata)),
&[],
false,
)?;
let (scalar, _) = *type_checker.resolve(expr)?;
if let Ok(arg) = ConstantExpr::try_from(scalar) {
Ok(arg.value)
} else {
Err(ErrorCode::BadArguments(format!(
"except const expr, got {expr}"
)))
}
}

pub(crate) fn resolve_copy_pattern(
ctx: Arc<dyn TableContext>,
pattern: &Expr,
) -> Result<String> {
let c = Self::resolve_const_expr(ctx.clone(), pattern)?;
if let Scalar::String(s) = c {
Ok(s)
} else {
Err(ErrorCode::BadArguments(format!(
"invalid pattern expr: {c}"
)))
}
}

async fn bind_copy_into_table_common(
&mut self,
bind_context: &mut BindContext,
Expand All @@ -136,10 +178,15 @@ impl<'a> Binder {
let (mut stage_info, path) = resolve_file_location(self.ctx.as_ref(), location).await?;
self.apply_copy_into_table_options(stmt, &mut stage_info)
.await?;
let pattern = match &stmt.pattern {
None => None,
Some(pattern) => Some(Self::resolve_copy_pattern(self.ctx.clone(), pattern)?),
};

let files_info = StageFilesInfo {
path,
files: stmt.files.clone(),
pattern: stmt.pattern.clone(),
pattern,
};
let required_values_schema: DataSchemaRef = Arc::new(
match &stmt.dst_columns {
Expand Down
31 changes: 31 additions & 0 deletions tests/sqllogictests/suites/stage/options/pattern_variable.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
statement ok
create or replace table t1(c1 int, c2 string);

statement ok
set variable pt='it.csv';

query
select $1 from @data/csv/ (pattern => $pt, file_format=>'csv')
----
1
2

query
copy into t1 from @data/csv/ pattern= $pt file_format=(type=csv)
----
csv/it.csv 2 0 NULL NULL

query
select * from t1
----
1 b
2 d

statement ok
unset variable pt;

query error 1006
copy into t1 from @data/csv/ pattern= $p file_format=(type=csv)

query error 1065
copy into t1 from @data/csv/ pattern=x file_format=(type=csv)

0 comments on commit 6fe9c16

Please sign in to comment.