diff --git a/src/query/pipeline/transforms/src/processors/transforms/transform_accumulating_async.rs b/src/query/pipeline/transforms/src/processors/transforms/transform_accumulating_async.rs index d4e4455595b9e..ad8c53fdb3f62 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/transform_accumulating_async.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/transform_accumulating_async.rs @@ -26,6 +26,10 @@ use databend_common_pipeline_core::processors::Processor; pub trait AsyncAccumulatingTransform: Send { const NAME: &'static str; + async fn on_start(&mut self) -> Result<()> { + Ok(()) + } + async fn transform(&mut self, data: DataBlock) -> Result>; async fn on_finish(&mut self, _output: bool) -> Result> { @@ -38,6 +42,7 @@ pub struct AsyncAccumulatingTransformer input: Arc, output: Arc, + called_on_start: bool, called_on_finish: bool, input_data: Option, output_data: Option, @@ -51,6 +56,7 @@ impl AsyncAccumulatingTransformer { output, input_data: None, output_data: None, + called_on_start: false, called_on_finish: false, }) } @@ -67,6 +73,10 @@ impl Processor for AsyncAccumulatingTra } fn event(&mut self) -> Result { + if !self.called_on_start { + return Ok(Event::Async); + } + if self.output.is_finished() { if !self.called_on_finish { return Ok(Event::Async); @@ -111,6 +121,12 @@ impl Processor for AsyncAccumulatingTra #[async_backtrace::framed] async fn async_process(&mut self) -> Result<()> { + if !self.called_on_start { + self.called_on_start = true; + self.inner.on_start().await?; + return Ok(()); + } + if let Some(data_block) = self.input_data.take() { self.output_data = self.inner.transform(data_block).await?; return Ok(()); diff --git a/src/query/storages/fuse/src/operations/read/fuse_rows_fetcher.rs b/src/query/storages/fuse/src/operations/read/fuse_rows_fetcher.rs index 97b32725f5284..e0ec3424b6161 100644 --- a/src/query/storages/fuse/src/operations/read/fuse_rows_fetcher.rs +++ b/src/query/storages/fuse/src/operations/read/fuse_rows_fetcher.rs @@ -25,15 +25,13 @@ use databend_common_expression::types::DataType; use databend_common_expression::types::NumberDataType; use databend_common_expression::BlockEntry; use databend_common_expression::Column; -use databend_common_expression::ColumnBuilder; use databend_common_expression::DataBlock; -use databend_common_expression::DataSchema; use databend_common_expression::Value; use databend_common_pipeline_core::processors::InputPort; use databend_common_pipeline_core::processors::OutputPort; use databend_common_pipeline_core::processors::ProcessorPtr; -use databend_common_pipeline_transforms::processors::AsyncTransform; -use databend_common_pipeline_transforms::processors::AsyncTransformer; +use databend_common_pipeline_transforms::AsyncAccumulatingTransform; +use databend_common_pipeline_transforms::AsyncAccumulatingTransformer; use databend_storages_common_io::ReadSettings; use super::native_rows_fetcher::NativeRowsFetcher; @@ -132,17 +130,17 @@ pub fn row_fetch_processor( pub trait RowsFetcher { async fn on_start(&mut self) -> Result<()>; async fn fetch(&mut self, row_ids: &[u64]) -> Result; - fn schema(&self) -> DataSchema; } pub struct TransformRowsFetcher { row_id_col_offset: usize, fetcher: F, need_wrap_nullable: bool, + blocks: Vec, } #[async_trait::async_trait] -impl AsyncTransform for TransformRowsFetcher +impl AsyncAccumulatingTransform for TransformRowsFetcher where F: RowsFetcher + Send + Sync + 'static { const NAME: &'static str = "TransformRowsFetcher"; @@ -152,18 +150,25 @@ where F: RowsFetcher + Send + Sync + 'static self.fetcher.on_start().await } + async fn transform(&mut self, data: DataBlock) -> Result> { + self.blocks.push(data); + Ok(None) + } + #[async_backtrace::framed] - async fn transform(&mut self, mut data: DataBlock) -> Result { + async fn on_finish(&mut self, _output: bool) -> Result> { + if self.blocks.is_empty() { + return Ok(None); + } + + let start_time = std::time::Instant::now(); + let num_blocks = self.blocks.len(); + let mut data = DataBlock::concat(&self.blocks)?; + self.blocks.clear(); + let num_rows = data.num_rows(); if num_rows == 0 { - // Although the data block is empty, we need to add empty columns to align the schema. - let fetched_schema = self.fetcher.schema(); - for f in fetched_schema.fields().iter() { - let builder = ColumnBuilder::with_capacity(f.data_type(), 0); - let col = builder.build(); - data.add_column(BlockEntry::new(f.data_type().clone(), Value::Column(col))); - } - return Ok(data); + return Ok(None); } let entry = &data.columns()[self.row_id_col_offset]; @@ -189,7 +194,14 @@ where F: RowsFetcher + Send + Sync + 'static } } - Ok(data) + log::info!( + "TransformRowsFetcher on_finish: num_rows: {}, input blocks: {} in {} milliseconds", + num_rows, + num_blocks, + start_time.elapsed().as_millis() + ); + + Ok(Some(data)) } } @@ -203,10 +215,11 @@ where F: RowsFetcher + Send + Sync + 'static fetcher: F, need_wrap_nullable: bool, ) -> ProcessorPtr { - ProcessorPtr::create(AsyncTransformer::create(input, output, Self { + ProcessorPtr::create(AsyncAccumulatingTransformer::create(input, output, Self { row_id_col_offset, fetcher, need_wrap_nullable, + blocks: vec![], })) } } diff --git a/src/query/storages/fuse/src/operations/read/native_rows_fetcher.rs b/src/query/storages/fuse/src/operations/read/native_rows_fetcher.rs index b7e1b16a0e3b4..ca45afc996279 100644 --- a/src/query/storages/fuse/src/operations/read/native_rows_fetcher.rs +++ b/src/query/storages/fuse/src/operations/read/native_rows_fetcher.rs @@ -26,7 +26,6 @@ use databend_common_catalog::plan::Projection; use databend_common_catalog::table::Table; use databend_common_exception::Result; use databend_common_expression::DataBlock; -use databend_common_expression::DataSchema; use databend_common_expression::TableSchemaRef; use databend_common_storage::ColumnNodes; use databend_storages_common_cache::LoadParams; @@ -148,10 +147,6 @@ impl RowsFetcher for NativeRowsFetcher { Ok(DataBlock::take_blocks(&blocks, &indices, num_rows)) } - - fn schema(&self) -> DataSchema { - self.reader.data_schema() - } } impl NativeRowsFetcher { diff --git a/src/query/storages/fuse/src/operations/read/parquet_rows_fetcher.rs b/src/query/storages/fuse/src/operations/read/parquet_rows_fetcher.rs index 260fe64a36dae..35b8a47ed9aaf 100644 --- a/src/query/storages/fuse/src/operations/read/parquet_rows_fetcher.rs +++ b/src/query/storages/fuse/src/operations/read/parquet_rows_fetcher.rs @@ -26,7 +26,6 @@ use databend_common_catalog::table::Table; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::DataBlock; -use databend_common_expression::DataSchema; use databend_common_expression::TableSchemaRef; use databend_common_storage::ColumnNodes; use databend_storages_common_cache::LoadParams; @@ -158,10 +157,6 @@ impl RowsFetcher for ParquetRowsFetcher { Ok(DataBlock::take_blocks(&blocks, &indices, num_rows)) } - - fn schema(&self) -> DataSchema { - self.reader.data_schema() - } } impl ParquetRowsFetcher {