From 9b38e65589ee816cf89216627c68e22ea5b5b729 Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Thu, 16 Jan 2025 14:05:48 +0800 Subject: [PATCH] fix: copy into table collect files twice some times. (#17300) * fix: copy into table collect files twice some times. * add log for infer schema --- src/query/sql/src/planner/binder/copy_into_table.rs | 2 ++ src/query/sql/src/planner/plans/copy_into_table.rs | 6 ++++++ .../storages/parquet/src/parquet_rs/parquet_table/table.rs | 5 ++++- 3 files changed, 12 insertions(+), 1 deletion(-) diff --git a/src/query/sql/src/planner/binder/copy_into_table.rs b/src/query/sql/src/planner/binder/copy_into_table.rs index c59d9baede093..fd1a0d6fb6655 100644 --- a/src/query/sql/src/planner/binder/copy_into_table.rs +++ b/src/query/sql/src/planner/binder/copy_into_table.rs @@ -223,6 +223,7 @@ impl Binder { write_mode: CopyIntoTableMode::Copy, query: None, enable_distributed: false, + files_collected: false, }) } @@ -400,6 +401,7 @@ impl Binder { enable_distributed: false, is_transform: false, + files_collected: true, }; self.bind_copy_into_table_from_location(bind_context, plan) diff --git a/src/query/sql/src/planner/plans/copy_into_table.rs b/src/query/sql/src/planner/plans/copy_into_table.rs index e5a43486e85ea..92325edab46c1 100644 --- a/src/query/sql/src/planner/plans/copy_into_table.rs +++ b/src/query/sql/src/planner/plans/copy_into_table.rs @@ -137,10 +137,16 @@ pub struct CopyIntoTablePlan { pub is_transform: bool, pub enable_distributed: bool, + + pub files_collected: bool, } impl CopyIntoTablePlan { pub async fn collect_files(&mut self, ctx: &dyn TableContext) -> Result<()> { + if self.files_collected { + return Ok(()); + } + self.files_collected = true; ctx.set_status_info("begin to list files"); let start = Instant::now(); diff --git a/src/query/storages/parquet/src/parquet_rs/parquet_table/table.rs b/src/query/storages/parquet/src/parquet_rs/parquet_table/table.rs index 9bbc66eb79860..fd0cae9405d7b 100644 --- a/src/query/storages/parquet/src/parquet_rs/parquet_table/table.rs +++ b/src/query/storages/parquet/src/parquet_rs/parquet_table/table.rs @@ -49,6 +49,7 @@ use databend_common_storage::parquet_rs::read_metadata_async; use databend_common_storage::StageFileInfo; use databend_common_storage::StageFilesInfo; use databend_storages_common_table_meta::table::ChangeType; +use log::info; use opendal::Operator; use parquet::file::metadata::ParquetMetaData; use parquet::schema::types::SchemaDescPtr; @@ -174,7 +175,9 @@ impl ParquetRSTable { // Infer schema from the first parquet file. // Assume all parquet files have the same schema. // If not, throw error during reading. - let size = operator.stat(path).await?.content_length(); + let stat = operator.stat(path).await?; + let size = stat.content_length(); + info!("infer schema from file {}, with stat {:?}", path, stat); let first_meta = read_metadata_async(path, &operator, Some(size)).await?; let arrow_schema = infer_schema_with_extension(first_meta.file_metadata())?; let compression_ratio = get_compression_ratio(&first_meta);