Skip to content

Commit

Permalink
feat: add unload options overwrite, include_query_id and `use_raw…
Browse files Browse the repository at this point in the history
…_path`. (#16614)

* feat: add unload options include_query_id and use_raw_path.

* feat: add unload options overwrite, default false.

* update tests.

* fix typo
youngsofun authored Oct 16, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
1 parent f147d3d commit a37384b
Showing 21 changed files with 217 additions and 102 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/query/ast/src/ast/format/syntax/dml.rs
Original file line number Diff line number Diff line change
@@ -236,7 +236,7 @@ pub(crate) fn pretty_copy_into_location(copy_stmt: CopyIntoLocationStmt) -> RcDo
.append(
RcDoc::line()
.append(RcDoc::text("SINGLE = "))
.append(RcDoc::text(copy_stmt.single.to_string())),
.append(RcDoc::text(copy_stmt.options.single.to_string())),
)
}

48 changes: 39 additions & 9 deletions src/query/ast/src/ast/statements/copy.rs
Original file line number Diff line number Diff line change
@@ -143,6 +143,29 @@ impl Display for CopyIntoTableStmt {
}
}

#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Drive, DriveMut, Eq)]
pub struct CopyIntoLocationOptions {
pub single: bool,
pub max_file_size: usize,
pub detailed_output: bool,
pub use_raw_path: bool,
pub include_query_id: bool,
pub overwrite: bool,
}

impl Default for CopyIntoLocationOptions {
fn default() -> Self {
Self {
single: Default::default(),
max_file_size: Default::default(),
detailed_output: false,
use_raw_path: false,
include_query_id: true,
overwrite: false,
}
}
}

/// CopyIntoLocationStmt is the parsed statement of `COPY into <location> from <table> ...`
#[derive(Debug, Clone, PartialEq, Drive, DriveMut)]
pub struct CopyIntoLocationStmt {
@@ -151,9 +174,7 @@ pub struct CopyIntoLocationStmt {
pub src: CopyIntoLocationSource,
pub dst: FileLocation,
pub file_format: FileFormatOptions,
pub single: bool,
pub max_file_size: usize,
pub detailed_output: bool,
pub options: CopyIntoLocationOptions,
}

impl Display for CopyIntoLocationStmt {
@@ -171,9 +192,12 @@ impl Display for CopyIntoLocationStmt {
if !self.file_format.is_empty() {
write!(f, " FILE_FORMAT = ({})", self.file_format)?;
}
write!(f, " SINGLE = {}", self.single)?;
write!(f, " MAX_FILE_SIZE = {}", self.max_file_size)?;
write!(f, " DETAILED_OUTPUT = {}", self.detailed_output)?;
write!(f, " SINGLE = {}", self.options.single)?;
write!(f, " MAX_FILE_SIZE = {}", self.options.max_file_size)?;
write!(f, " DETAILED_OUTPUT = {}", self.options.detailed_output)?;
write!(f, " INCLUDE_QUERY_ID = {}", self.options.include_query_id)?;
write!(f, " USE_RAW_PATH = {}", self.options.use_raw_path)?;
write!(f, " OVERWRITE = {}", self.options.overwrite)?;

Ok(())
}
@@ -183,9 +207,12 @@ impl CopyIntoLocationStmt {
pub fn apply_option(&mut self, opt: CopyIntoLocationOption) {
match opt {
CopyIntoLocationOption::FileFormat(v) => self.file_format = v,
CopyIntoLocationOption::Single(v) => self.single = v,
CopyIntoLocationOption::MaxFileSize(v) => self.max_file_size = v,
CopyIntoLocationOption::DetailedOutput(v) => self.detailed_output = v,
CopyIntoLocationOption::Single(v) => self.options.single = v,
CopyIntoLocationOption::MaxFileSize(v) => self.options.max_file_size = v,
CopyIntoLocationOption::DetailedOutput(v) => self.options.detailed_output = v,
CopyIntoLocationOption::IncludeQueryID(v) => self.options.include_query_id = v,
CopyIntoLocationOption::UseRawPath(v) => self.options.use_raw_path = v,
CopyIntoLocationOption::OverWrite(v) => self.options.overwrite = v,
}
}
}
@@ -482,7 +509,10 @@ pub enum CopyIntoLocationOption {
FileFormat(FileFormatOptions),
MaxFileSize(usize),
Single(bool),
IncludeQueryID(bool),
UseRawPath(bool),
DetailedOutput(bool),
OverWrite(bool),
}

#[derive(Clone, Debug, PartialEq, Eq, Default, Drive, DriveMut)]
16 changes: 13 additions & 3 deletions src/query/ast/src/parser/copy.rs
Original file line number Diff line number Diff line change
@@ -110,9 +110,7 @@ fn copy_into_location(i: Input) -> IResult<Statement> {
src,
dst,
file_format: Default::default(),
single: Default::default(),
max_file_size: Default::default(),
detailed_output: false,
options: Default::default(),
};
for opt in opts {
copy_stmt.apply_option(opt);
@@ -210,6 +208,18 @@ fn copy_into_location_option(i: Input) -> IResult<CopyIntoLocationOption> {
rule! { DETAILED_OUTPUT ~ "=" ~ #literal_bool },
|(_, _, detailed_output)| CopyIntoLocationOption::DetailedOutput(detailed_output),
),
map(
rule! { USE_RAW_PATH ~ "=" ~ #literal_bool },
|(_, _, use_raw_path)| CopyIntoLocationOption::UseRawPath(use_raw_path),
),
map(
rule! { INCLUDE_QUERY_ID ~ "=" ~ #literal_bool },
|(_, _, include_query_id)| CopyIntoLocationOption::IncludeQueryID(include_query_id),
),
map(
rule! { OVERWRITE ~ "=" ~ #literal_bool },
|(_, _, include_query_id)| CopyIntoLocationOption::OverWrite(include_query_id),
),
map(rule! { #file_format_clause }, |options| {
CopyIntoLocationOption::FileFormat(options)
}),
4 changes: 4 additions & 0 deletions src/query/ast/src/parser/token.rs
Original file line number Diff line number Diff line change
@@ -683,6 +683,8 @@ pub enum TokenKind {
IF,
#[token("IN", ignore(ascii_case))]
IN,
#[token("INCLUDE_QUERY_ID", ignore(ascii_case))]
INCLUDE_QUERY_ID,
#[token("INCREMENTAL", ignore(ascii_case))]
INCREMENTAL,
#[token("INDEX", ignore(ascii_case))]
@@ -1064,6 +1066,8 @@ pub enum TokenKind {
SYNTAX,
#[token("USAGE", ignore(ascii_case))]
USAGE,
#[token("USE_RAW_PATH", ignore(ascii_case))]
USE_RAW_PATH,
#[token("UPDATE", ignore(ascii_case))]
UPDATE,
#[token("UPLOAD", ignore(ascii_case))]
43 changes: 29 additions & 14 deletions src/query/ast/tests/it/testdata/stmt.txt
Original file line number Diff line number Diff line change
@@ -14252,7 +14252,7 @@ COPY INTO 's3://mybucket/data.csv'
skip_header = 1
)
---------- Output ---------
COPY INTO 's3://mybucket/data.csv' FROM mytable FILE_FORMAT = (field_delimiter = ',', record_delimiter = '\n', skip_header = 1, type = CSV) SINGLE = false MAX_FILE_SIZE = 0 DETAILED_OUTPUT = false
COPY INTO 's3://mybucket/data.csv' FROM mytable FILE_FORMAT = (field_delimiter = ',', record_delimiter = '\n', skip_header = 1, type = CSV) SINGLE = false MAX_FILE_SIZE = 0 DETAILED_OUTPUT = false INCLUDE_QUERY_ID = true USE_RAW_PATH = false OVERWRITE = false
---------- AST ------------
CopyIntoLocation(
CopyIntoLocationStmt {
@@ -14301,9 +14301,14 @@ CopyIntoLocation(
),
},
},
single: false,
max_file_size: 0,
detailed_output: false,
options: CopyIntoLocationOptions {
single: false,
max_file_size: 0,
detailed_output: false,
use_raw_path: false,
include_query_id: true,
overwrite: false,
},
},
)

@@ -14312,7 +14317,7 @@ CopyIntoLocation(
COPY INTO '@my_stage/my data'
FROM mytable;
---------- Output ---------
COPY INTO '@my_stage/my data' FROM mytable SINGLE = false MAX_FILE_SIZE = 0 DETAILED_OUTPUT = false
COPY INTO '@my_stage/my data' FROM mytable SINGLE = false MAX_FILE_SIZE = 0 DETAILED_OUTPUT = false INCLUDE_QUERY_ID = true USE_RAW_PATH = false OVERWRITE = false
---------- AST ------------
CopyIntoLocation(
CopyIntoLocationStmt {
@@ -14339,9 +14344,14 @@ CopyIntoLocation(
file_format: FileFormatOptions {
options: {},
},
single: false,
max_file_size: 0,
detailed_output: false,
options: CopyIntoLocationOptions {
single: false,
max_file_size: 0,
detailed_output: false,
use_raw_path: false,
include_query_id: true,
overwrite: false,
},
},
)

@@ -14356,7 +14366,7 @@ COPY INTO @my_stage
skip_header = 1
);
---------- Output ---------
COPY INTO '@my_stage' FROM mytable FILE_FORMAT = (field_delimiter = ',', record_delimiter = '\n', skip_header = 1, type = CSV) SINGLE = false MAX_FILE_SIZE = 0 DETAILED_OUTPUT = false
COPY INTO '@my_stage' FROM mytable FILE_FORMAT = (field_delimiter = ',', record_delimiter = '\n', skip_header = 1, type = CSV) SINGLE = false MAX_FILE_SIZE = 0 DETAILED_OUTPUT = false INCLUDE_QUERY_ID = true USE_RAW_PATH = false OVERWRITE = false
---------- AST ------------
CopyIntoLocation(
CopyIntoLocationStmt {
@@ -14396,9 +14406,14 @@ CopyIntoLocation(
),
},
},
single: false,
max_file_size: 0,
detailed_output: false,
options: CopyIntoLocationOptions {
single: false,
max_file_size: 0,
detailed_output: false,
use_raw_path: false,
include_query_id: true,
overwrite: false,
},
},
)

@@ -19827,7 +19842,7 @@ CreateTask(
---------- Input ----------
CREATE TASK IF NOT EXISTS MyTask1 SCHEDULE = USING CRON '0 13 * * *' AS COPY INTO @my_internal_stage FROM canadian_city_population FILE_FORMAT = (TYPE = PARQUET)
---------- Output ---------
CREATE TASK IF NOT EXISTS MyTask1 SCHEDULE = USING CRON '0 13 * * *' AS COPY INTO '@my_internal_stage' FROM canadian_city_population FILE_FORMAT = (type = PARQUET) SINGLE = false MAX_FILE_SIZE = 0 DETAILED_OUTPUT = false
CREATE TASK IF NOT EXISTS MyTask1 SCHEDULE = USING CRON '0 13 * * *' AS COPY INTO '@my_internal_stage' FROM canadian_city_population FILE_FORMAT = (type = PARQUET) SINGLE = false MAX_FILE_SIZE = 0 DETAILED_OUTPUT = false INCLUDE_QUERY_ID = true USE_RAW_PATH = false OVERWRITE = false
---------- AST ------------
CreateTask(
CreateTaskStmt {
@@ -19849,7 +19864,7 @@ CreateTask(
after: [],
when_condition: None,
sql: SingleStatement(
"COPY INTO '@my_internal_stage' FROM canadian_city_population FILE_FORMAT = (type = PARQUET) SINGLE = false MAX_FILE_SIZE = 0 DETAILED_OUTPUT = false",
"COPY INTO '@my_internal_stage' FROM canadian_city_population FILE_FORMAT = (type = PARQUET) SINGLE = false MAX_FILE_SIZE = 0 DETAILED_OUTPUT = false INCLUDE_QUERY_ID = true USE_RAW_PATH = false OVERWRITE = false",
),
},
)
Original file line number Diff line number Diff line change
@@ -17,6 +17,7 @@ use std::fmt::Display;
use std::fmt::Formatter;
use std::sync::Arc;

use databend_common_ast::ast::CopyIntoLocationOptions;
use databend_common_exception::Result;
use databend_common_expression::RemoteExpr;
use databend_common_expression::TableSchema;
@@ -40,6 +41,7 @@ pub struct StageTableInfo {
// - may need to be purged as well (depends on the copy options)
pub duplicated_files_detected: Vec<String>,
pub is_select: bool,
pub copy_into_location_options: CopyIntoLocationOptions,
}

impl StageTableInfo {
Original file line number Diff line number Diff line change
@@ -14,6 +14,7 @@

use std::sync::Arc;

use databend_common_ast::ast::CopyIntoLocationOptions;
use databend_common_base::runtime::GlobalIORuntime;
use databend_common_catalog::plan::StageTableInfo;
use databend_common_exception::Result;
@@ -86,6 +87,7 @@ impl CopyIntoLocationInterpreter {
stage: &StageInfo,
path: &str,
query: &Plan,
options: &CopyIntoLocationOptions,
) -> Result<(PipelineBuildResult, Vec<UpdateStreamMetaReq>)> {
let (query_interpreter, update_stream_meta_req) = self.build_query(query).await?;
let query_physical_plan = query_interpreter.build_physical_plan().await?;
@@ -109,6 +111,7 @@ impl CopyIntoLocationInterpreter {
duplicated_files_detected: vec![],
is_select: false,
default_values: None,
copy_into_location_options: options.clone(),
},
}));

@@ -145,6 +148,7 @@ impl Interpreter for CopyIntoLocationInterpreter {
&self.plan.stage,
&self.plan.path,
&self.plan.from,
&self.plan.options,
)
.await?;

3 changes: 3 additions & 0 deletions src/query/service/src/sessions/query_ctx.rs
Original file line number Diff line number Diff line change
@@ -1309,6 +1309,7 @@ impl TableContext for QueryContext {
duplicated_files_detected: vec![],
is_select: true,
default_values: None,
copy_into_location_options: Default::default(),
};
OrcTable::try_create(info).await
}
@@ -1325,6 +1326,7 @@ impl TableContext for QueryContext {
duplicated_files_detected: vec![],
is_select: true,
default_values: None,
copy_into_location_options: Default::default(),
};
StageTable::try_create(info)
}
@@ -1359,6 +1361,7 @@ impl TableContext for QueryContext {
duplicated_files_detected: vec![],
is_select: true,
default_values: None,
copy_into_location_options: Default::default(),
};
StageTable::try_create(info)
}
68 changes: 42 additions & 26 deletions src/query/sql/src/planner/binder/copy_into_location.rs
Original file line number Diff line number Diff line change
@@ -19,7 +19,8 @@ use databend_common_ast::parser::parse_sql;
use databend_common_ast::parser::tokenize_sql;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_meta_app::principal::StageInfo;
use databend_common_storage::init_stage_operator;
use opendal::ErrorKind;

use crate::binder::copy_into_table::resolve_file_location;
use crate::binder::Binder;
@@ -34,6 +35,22 @@ impl<'a> Binder {
bind_context: &mut BindContext,
stmt: &CopyIntoLocationStmt,
) -> Result<Plan> {
if stmt.options.use_raw_path && !stmt.options.single {
return Err(ErrorCode::InvalidArgument(
"use_raw_path=true can only be set when single=true",
));
}
if stmt.options.overwrite && (!stmt.options.single || !stmt.options.use_raw_path) {
return Err(ErrorCode::InvalidArgument(
"overwrite=true can only be set when single=true and use_raw_path=true for now",
));
}
if !stmt.options.include_query_id && !stmt.options.use_raw_path {
return Err(ErrorCode::InvalidArgument(
"include_query_id=false can only be set when use_raw_path=true",
));
}

let query = match &stmt.src {
CopyIntoLocationSource::Table(table) => {
let (catalog_name, database_name, table_name) = self
@@ -72,36 +89,35 @@ impl<'a> Binder {
}?;

let (mut stage_info, path) = resolve_file_location(self.ctx.as_ref(), &stmt.dst).await?;
self.apply_copy_into_location_options(stmt, &mut stage_info)
.await?;

if stmt.options.use_raw_path {
if path.ends_with("/") {
return Err(ErrorCode::BadArguments(
"when use_raw_path is set to true, url path can not end with '/'",
));
}
let op = init_stage_operator(&stage_info)?;
if !stmt.options.overwrite {
match op.stat(&path).await {
Ok(_) => return Err(ErrorCode::BadArguments("file already exists")),
Err(e) => {
if e.kind() != ErrorKind::NotFound {
return Err(e.into());
}
}
}
}
}

if !stmt.file_format.is_empty() {
stage_info.file_format_params = self.try_resolve_file_format(&stmt.file_format).await?;
}

Ok(Plan::CopyIntoLocation(CopyIntoLocationPlan {
stage: Box::new(stage_info),
path,
from: Box::new(query),
options: stmt.options.clone(),
}))
}

#[async_backtrace::framed]
pub async fn apply_copy_into_location_options(
&mut self,
stmt: &CopyIntoLocationStmt,
stage: &mut StageInfo,
) -> Result<()> {
if !stmt.file_format.is_empty() {
stage.file_format_params = self.try_resolve_file_format(&stmt.file_format).await?;
}

// Copy options.
{
// max_file_size.
if stmt.max_file_size != 0 {
stage.copy_options.max_file_size = stmt.max_file_size;
}
stage.copy_options.single = stmt.single;
stage.copy_options.detailed_output = stmt.detailed_output;
}

Ok(())
}
}
2 changes: 2 additions & 0 deletions src/query/sql/src/planner/binder/copy_into_table.rs
Original file line number Diff line number Diff line change
@@ -204,6 +204,7 @@ impl<'a> Binder {
duplicated_files_detected: vec![],
is_select: false,
default_values,
copy_into_location_options: Default::default(),
},
values_consts: vec![],
required_source_schema: required_values_schema.clone(),
@@ -363,6 +364,7 @@ impl<'a> Binder {
duplicated_files_detected,
is_select: false,
default_values: Some(default_values),
copy_into_location_options: Default::default(),
},
write_mode,
query: None,
18 changes: 11 additions & 7 deletions src/query/sql/src/planner/optimizer/optimizer.rs
Original file line number Diff line number Diff line change
@@ -238,13 +238,17 @@ pub async fn optimize(mut opt_ctx: OptimizerContext, plan: Plan) -> Result<Plan>
partial,
plan: Box::new(Box::pin(optimize(opt_ctx, *plan)).await?),
}),
Plan::CopyIntoLocation(CopyIntoLocationPlan { stage, path, from }) => {
Ok(Plan::CopyIntoLocation(CopyIntoLocationPlan {
stage,
path,
from: Box::new(Box::pin(optimize(opt_ctx, *from)).await?),
}))
}
Plan::CopyIntoLocation(CopyIntoLocationPlan {
stage,
path,
from,
options,
}) => Ok(Plan::CopyIntoLocation(CopyIntoLocationPlan {
stage,
path,
from: Box::new(Box::pin(optimize(opt_ctx, *from)).await?),
options,
})),
Plan::CopyIntoTable(mut plan) if !plan.no_file_to_copy => {
plan.enable_distributed = opt_ctx.enable_distributed_optimization
&& opt_ctx
4 changes: 3 additions & 1 deletion src/query/sql/src/planner/plans/copy_into_location.rs
Original file line number Diff line number Diff line change
@@ -15,6 +15,7 @@
use std::fmt::Debug;
use std::fmt::Formatter;

use databend_common_ast::ast::CopyIntoLocationOptions;
use databend_common_expression::types::DataType;
use databend_common_expression::types::NumberDataType;
use databend_common_expression::DataField;
@@ -29,11 +30,12 @@ pub struct CopyIntoLocationPlan {
pub stage: Box<StageInfo>,
pub path: String,
pub from: Box<Plan>,
pub options: CopyIntoLocationOptions,
}

impl CopyIntoLocationPlan {
pub fn schema(&self) -> DataSchemaRef {
if self.stage.copy_options.detailed_output {
if self.options.detailed_output {
DataSchemaRefExt::create(vec![
DataField::new("file_name", DataType::String),
DataField::new("file_size", DataType::Number(NumberDataType::UInt64)),
2 changes: 0 additions & 2 deletions src/query/storages/stage/Cargo.toml
Original file line number Diff line number Diff line change
@@ -44,8 +44,6 @@ serde = { workspace = true }
serde_json = { workspace = true }
typetag = { workspace = true }

uuid = { workspace = true }

[build-dependencies]
databend-common-building = { workspace = true }

8 changes: 4 additions & 4 deletions src/query/storages/stage/src/append/do_append.rs
Original file line number Diff line number Diff line change
@@ -38,14 +38,14 @@ impl StageTable {
let max_threads = settings.get_max_threads()? as usize;

let op = StageTable::get_op(&self.table_info.stage_info)?;
let uuid = uuid::Uuid::new_v4().to_string();
let query_id = ctx.get_id();
let group_id = AtomicUsize::new(0);
match fmt {
FileFormatParams::Parquet(_) => append_data_to_parquet_files(
pipeline,
self.table_info.clone(),
op,
uuid,
query_id,
&group_id,
mem_limit,
max_threads,
@@ -55,13 +55,13 @@ impl StageTable {
ctx.clone(),
self.table_info.clone(),
op,
uuid,
query_id,
&group_id,
mem_limit,
max_threads,
)?,
};
if !self.table_info.stage_info.copy_options.detailed_output {
if !self.table_info.copy_into_location_options.detailed_output {
pipeline.try_resize(1)?;
pipeline.add_accumulating_transformer(SumSummaryTransform::default);
}
8 changes: 4 additions & 4 deletions src/query/storages/stage/src/append/parquet_file/pipeline.rs
Original file line number Diff line number Diff line change
@@ -27,13 +27,13 @@ pub(crate) fn append_data_to_parquet_files(
pipeline: &mut Pipeline,
table_info: StageTableInfo,
op: Operator,
uuid: String,
query_id: String,
group_id: &std::sync::atomic::AtomicUsize,
mem_limit: usize,
max_threads: usize,
) -> Result<()> {
let is_single = table_info.stage_info.copy_options.single;
let max_file_size = table_info.stage_info.copy_options.max_file_size;
let is_single = table_info.copy_into_location_options.single;
let max_file_size = table_info.copy_into_location_options.max_file_size;
// when serializing block to parquet, the memory may be doubled
let mem_limit = mem_limit / 2;
pipeline.try_resize(1)?;
@@ -60,7 +60,7 @@ pub(crate) fn append_data_to_parquet_files(
output,
table_info.clone(),
op.clone(),
uuid.clone(),
query_id.clone(),
gid,
max_file_size,
)
Original file line number Diff line number Diff line change
@@ -62,7 +62,7 @@ pub struct ParquetFileWriter {
unload_output: UnloadOutput,
unload_output_blocks: Option<VecDeque<DataBlock>>,

uuid: String,
query_id: String,
group_id: usize,
batch_id: usize,

@@ -100,12 +100,12 @@ impl ParquetFileWriter {
output: Arc<OutputPort>,
table_info: StageTableInfo,
data_accessor: Operator,
uuid: String,
query_id: String,
group_id: usize,
targe_file_size: Option<usize>,
) -> Result<ProcessorPtr> {
let unload_output =
UnloadOutput::create(table_info.stage_info.copy_options.detailed_output);
UnloadOutput::create(table_info.copy_into_location_options.detailed_output);

let arrow_schema = Arc::new(table_schema_to_arrow_schema(&table_info.schema));
let writer = create_writer(arrow_schema.clone(), targe_file_size)?;
@@ -122,7 +122,7 @@ impl ParquetFileWriter {
input_bytes: 0,
file_to_write: None,
data_accessor,
uuid,
query_id,
group_id,
batch_id: 0,
targe_file_size,
@@ -242,7 +242,7 @@ impl Processor for ParquetFileWriter {
assert!(self.file_to_write.is_some());
let path = unload_path(
&self.table_info,
&self.uuid,
&self.query_id,
self.group_id,
self.batch_id,
None,
40 changes: 24 additions & 16 deletions src/query/storages/stage/src/append/path.rs
Original file line number Diff line number Diff line change
@@ -17,7 +17,7 @@ use databend_common_compress::CompressAlgorithm;

pub fn unload_path(
stage_table_info: &StageTableInfo,
uuid: &str,
query_id: &str,
group_id: usize,
batch_id: usize,
compression: Option<CompressAlgorithm>,
@@ -33,23 +33,31 @@ pub fn unload_path(
.unwrap_or_default();

let path = &stage_table_info.files_info.path;

if path.ends_with("data_") {
format!(
"{}{}_{:0>4}_{:0>8}.{}{}",
path, uuid, group_id, batch_id, format_name, suffix
)
if stage_table_info.copy_into_location_options.use_raw_path {
path.to_string()
} else {
let (path, sep) = if path == "/" {
("", "")
} else if path.ends_with('/') {
(path.as_str(), "")
let query_id = if stage_table_info.copy_into_location_options.include_query_id {
format!("{query_id}_")
} else {
(path.as_str(), "/")
"".to_string()
};
format!(
"{}{}data_{}_{:0>4}_{:0>8}.{}{}",
path, sep, uuid, group_id, batch_id, format_name, suffix
)
if path.ends_with("data_") {
format!(
"{}{}{:0>4}_{:0>8}.{}{}",
path, query_id, group_id, batch_id, format_name, suffix
)
} else {
let (path, sep) = if path == "/" {
("", "")
} else if path.ends_with('/') {
(path.as_str(), "")
} else {
(path.as_str(), "/")
};
format!(
"{}{}data_{}{:0>4}_{:0>8}.{}{}",
path, sep, query_id, group_id, batch_id, format_name, suffix
)
}
}
}
Original file line number Diff line number Diff line change
@@ -37,13 +37,13 @@ pub(crate) fn append_data_to_row_based_files(
ctx: Arc<dyn TableContext>,
table_info: StageTableInfo,
op: Operator,
uuid: String,
query_id: String,
group_id: &std::sync::atomic::AtomicUsize,
mem_limit: usize,
max_threads: usize,
) -> Result<()> {
let is_single = table_info.stage_info.copy_options.single;
let max_file_size = table_info.stage_info.copy_options.max_file_size;
let is_single = table_info.copy_into_location_options.single;
let max_file_size = table_info.copy_into_location_options.max_file_size;
let compression = table_info.stage_info.file_format_params.compression();
// when serializing block to parquet, the memory may be doubled
let mem_limit = mem_limit / 2;
@@ -101,7 +101,7 @@ pub(crate) fn append_data_to_row_based_files(
table_info.clone(),
op.clone(),
prefix.clone(),
uuid.clone(),
query_id.clone(),
gid,
compression,
)
Original file line number Diff line number Diff line change
@@ -52,7 +52,7 @@ pub struct RowBasedFileWriter {
data_accessor: Operator,
prefix: Vec<u8>,

uuid: String,
query_id: String,
group_id: usize,
batch_id: usize,

@@ -66,19 +66,19 @@ impl RowBasedFileWriter {
table_info: StageTableInfo,
data_accessor: Operator,
prefix: Vec<u8>,
uuid: String,
query_id: String,
group_id: usize,
compression: Option<CompressAlgorithm>,
) -> Result<ProcessorPtr> {
let unload_output =
UnloadOutput::create(table_info.stage_info.copy_options.detailed_output);
UnloadOutput::create(table_info.copy_into_location_options.detailed_output);
Ok(ProcessorPtr::create(Box::new(RowBasedFileWriter {
table_info,
input,
input_data: None,
data_accessor,
prefix,
uuid,
query_id,
group_id,
batch_id: 0,
file_to_write: None,
@@ -172,7 +172,7 @@ impl Processor for RowBasedFileWriter {
async fn async_process(&mut self) -> Result<()> {
let path = unload_path(
&self.table_info,
&self.uuid,
&self.query_id,
self.group_id,
self.batch_id,
self.compression,
18 changes: 18 additions & 0 deletions tests/sqllogictests/suites/stage/unload.test
Original file line number Diff line number Diff line change
@@ -81,3 +81,21 @@ select $1, $2 from @unload(file_format => 'tsv');
1 2
3 4
5 6

query
copy into @unload/a_raw_path.csv from (select 1,2) file_format=(type=csv) single=true include_query_id=false use_raw_path=true detailed_output=true overwrite=true;
----
a_raw_path.csv 4 1

query
copy into @unload/a_raw_path.csv from (select 3,4) file_format=(type=csv) single=true include_query_id=false use_raw_path=true detailed_output=true overwrite=true;
----
a_raw_path.csv 4 1

query
select $1, $2 from @unload/a_raw_path.csv (file_format => 'csv');
----
3 4

statement error 1006.*file already exists
copy into @unload/a_raw_path.csv from (select 3,4) file_format=(type=csv) single=true include_query_id=false use_raw_path=true detailed_output=false overwrite=false;

0 comments on commit a37384b

Please sign in to comment.