From 8466be7294408f5981185355812dbb93f9b20714 Mon Sep 17 00:00:00 2001
From: sundyli <543950155@qq.com>
Date: Thu, 16 Jan 2025 21:16:25 +0800
Subject: [PATCH 1/4] fix(query): improve fuse row fetch, use accumulating

---
 .../transform_accumulating_async.rs           | 16 +++++++++
 .../src/operations/read/fuse_rows_fetcher.rs  | 36 +++++++++----------
 .../operations/read/native_rows_fetcher.rs    |  4 ---
 .../operations/read/parquet_rows_fetcher.rs   |  4 ---
 4 files changed, 34 insertions(+), 26 deletions(-)

diff --git a/src/query/pipeline/transforms/src/processors/transforms/transform_accumulating_async.rs b/src/query/pipeline/transforms/src/processors/transforms/transform_accumulating_async.rs
index d4e4455595b9e..ad8c53fdb3f62 100644
--- a/src/query/pipeline/transforms/src/processors/transforms/transform_accumulating_async.rs
+++ b/src/query/pipeline/transforms/src/processors/transforms/transform_accumulating_async.rs
@@ -26,6 +26,10 @@ use databend_common_pipeline_core::processors::Processor;
 pub trait AsyncAccumulatingTransform: Send {
     const NAME: &'static str;
 
+    async fn on_start(&mut self) -> Result<()> {
+        Ok(())
+    }
+
     async fn transform(&mut self, data: DataBlock) -> Result<Option<DataBlock>>;
 
     async fn on_finish(&mut self, _output: bool) -> Result<Option<DataBlock>> {
@@ -38,6 +42,7 @@ pub struct AsyncAccumulatingTransformer<T: AsyncAccumulatingTransform + 'static>
     input: Arc<InputPort>,
     output: Arc<OutputPort>,
 
+    called_on_start: bool,
     called_on_finish: bool,
     input_data: Option<DataBlock>,
     output_data: Option<DataBlock>,
@@ -51,6 +56,7 @@ impl<T: AsyncAccumulatingTransform + 'static> AsyncAccumulatingTransformer<T> {
             output,
             input_data: None,
             output_data: None,
+            called_on_start: false,
             called_on_finish: false,
         })
     }
@@ -67,6 +73,10 @@ impl<T: AsyncAccumulatingTransform + 'static> Processor for AsyncAccumulatingTra
     }
 
     fn event(&mut self) -> Result<Event> {
+        if !self.called_on_start {
+            return Ok(Event::Async);
+        }
+
         if self.output.is_finished() {
             if !self.called_on_finish {
                 return Ok(Event::Async);
@@ -111,6 +121,12 @@ impl<T: AsyncAccumulatingTransform + 'static> Processor for AsyncAccumulatingTra
 
     #[async_backtrace::framed]
     async fn async_process(&mut self) -> Result<()> {
+        if !self.called_on_start {
+            self.called_on_start = true;
+            self.inner.on_start().await?;
+            return Ok(());
+        }
+
         if let Some(data_block) = self.input_data.take() {
             self.output_data = self.inner.transform(data_block).await?;
             return Ok(());
diff --git a/src/query/storages/fuse/src/operations/read/fuse_rows_fetcher.rs b/src/query/storages/fuse/src/operations/read/fuse_rows_fetcher.rs
index 97b32725f5284..28a8b64010a45 100644
--- a/src/query/storages/fuse/src/operations/read/fuse_rows_fetcher.rs
+++ b/src/query/storages/fuse/src/operations/read/fuse_rows_fetcher.rs
@@ -25,15 +25,14 @@ use databend_common_expression::types::DataType;
 use databend_common_expression::types::NumberDataType;
 use databend_common_expression::BlockEntry;
 use databend_common_expression::Column;
-use databend_common_expression::ColumnBuilder;
 use databend_common_expression::DataBlock;
 use databend_common_expression::DataSchema;
 use databend_common_expression::Value;
 use databend_common_pipeline_core::processors::InputPort;
 use databend_common_pipeline_core::processors::OutputPort;
 use databend_common_pipeline_core::processors::ProcessorPtr;
-use databend_common_pipeline_transforms::processors::AsyncTransform;
-use databend_common_pipeline_transforms::processors::AsyncTransformer;
+use databend_common_pipeline_transforms::AsyncAccumulatingTransform;
+use databend_common_pipeline_transforms::AsyncAccumulatingTransformer;
 use databend_storages_common_io::ReadSettings;
 
 use super::native_rows_fetcher::NativeRowsFetcher;
@@ -132,17 +131,17 @@ pub fn row_fetch_processor(
 pub trait RowsFetcher {
     async fn on_start(&mut self) -> Result<()>;
     async fn fetch(&mut self, row_ids: &[u64]) -> Result<DataBlock>;
-    fn schema(&self) -> DataSchema;
 }
 
 pub struct TransformRowsFetcher<F: RowsFetcher> {
     row_id_col_offset: usize,
     fetcher: F,
     need_wrap_nullable: bool,
+    blocks: Vec<DataBlock>,
 }
 
 #[async_trait::async_trait]
-impl<F> AsyncTransform for TransformRowsFetcher<F>
+impl<F> AsyncAccumulatingTransform for TransformRowsFetcher<F>
 where F: RowsFetcher + Send + Sync + 'static
 {
     const NAME: &'static str = "TransformRowsFetcher";
@@ -152,20 +151,20 @@ where F: RowsFetcher + Send + Sync + 'static
         self.fetcher.on_start().await
     }
 
+    async fn transform(&mut self, data: DataBlock) -> Result<Option<DataBlock>> {
+        self.blocks.push(data);
+        Ok(None)
+    }
+
     #[async_backtrace::framed]
-    async fn transform(&mut self, mut data: DataBlock) -> Result<DataBlock> {
-        let num_rows = data.num_rows();
-        if num_rows == 0 {
-            // Although the data block is empty, we need to add empty columns to align the schema.
-            let fetched_schema = self.fetcher.schema();
-            for f in fetched_schema.fields().iter() {
-                let builder = ColumnBuilder::with_capacity(f.data_type(), 0);
-                let col = builder.build();
-                data.add_column(BlockEntry::new(f.data_type().clone(), Value::Column(col)));
-            }
-            return Ok(data);
+    async fn on_finish(&mut self, _output: bool) -> Result<Option<DataBlock>> {
+        if self.blocks.is_empty() {
+            return Ok(None);
         }
 
+        let mut data = DataBlock::concat(&self.blocks)?;
+        let num_rows = data.num_rows();
+
         let entry = &data.columns()[self.row_id_col_offset];
         let value = entry
             .value
@@ -189,7 +188,7 @@ where F: RowsFetcher + Send + Sync + 'static
             }
         }
 
-        Ok(data)
+        Ok(Some(data))
     }
 }
 
@@ -203,10 +202,11 @@ where F: RowsFetcher + Send + Sync + 'static
         fetcher: F,
         need_wrap_nullable: bool,
     ) -> ProcessorPtr {
-        ProcessorPtr::create(AsyncTransformer::create(input, output, Self {
+        ProcessorPtr::create(AsyncAccumulatingTransformer::create(input, output, Self {
             row_id_col_offset,
             fetcher,
             need_wrap_nullable,
+            blocks: vec![],
         }))
     }
 }
diff --git a/src/query/storages/fuse/src/operations/read/native_rows_fetcher.rs b/src/query/storages/fuse/src/operations/read/native_rows_fetcher.rs
index b7e1b16a0e3b4..7bd990296034d 100644
--- a/src/query/storages/fuse/src/operations/read/native_rows_fetcher.rs
+++ b/src/query/storages/fuse/src/operations/read/native_rows_fetcher.rs
@@ -148,10 +148,6 @@ impl<const BLOCKING_IO: bool> RowsFetcher for NativeRowsFetcher<BLOCKING_IO> {
 
         Ok(DataBlock::take_blocks(&blocks, &indices, num_rows))
     }
-
-    fn schema(&self) -> DataSchema {
-        self.reader.data_schema()
-    }
 }
 
 impl<const BLOCKING_IO: bool> NativeRowsFetcher<BLOCKING_IO> {
diff --git a/src/query/storages/fuse/src/operations/read/parquet_rows_fetcher.rs b/src/query/storages/fuse/src/operations/read/parquet_rows_fetcher.rs
index 260fe64a36dae..8666cf9ab93ec 100644
--- a/src/query/storages/fuse/src/operations/read/parquet_rows_fetcher.rs
+++ b/src/query/storages/fuse/src/operations/read/parquet_rows_fetcher.rs
@@ -158,10 +158,6 @@ impl<const BLOCKING_IO: bool> RowsFetcher for ParquetRowsFetcher<BLOCKING_IO> {
 
         Ok(DataBlock::take_blocks(&blocks, &indices, num_rows))
     }
-
-    fn schema(&self) -> DataSchema {
-        self.reader.data_schema()
-    }
 }
 
 impl<const BLOCKING_IO: bool> ParquetRowsFetcher<BLOCKING_IO> {

From b49a989b182f1695bf35a1f5ebc0896fcba4858c Mon Sep 17 00:00:00 2001
From: sundyli <543950155@qq.com>
Date: Thu, 16 Jan 2025 21:18:48 +0800
Subject: [PATCH 2/4] fix(query): improve fuse row fetch, use accumulating

---
 .../storages/fuse/src/operations/read/fuse_rows_fetcher.rs   | 5 +++++
 1 file changed, 5 insertions(+)

diff --git a/src/query/storages/fuse/src/operations/read/fuse_rows_fetcher.rs b/src/query/storages/fuse/src/operations/read/fuse_rows_fetcher.rs
index 28a8b64010a45..85e239dc832d8 100644
--- a/src/query/storages/fuse/src/operations/read/fuse_rows_fetcher.rs
+++ b/src/query/storages/fuse/src/operations/read/fuse_rows_fetcher.rs
@@ -163,7 +163,12 @@ where F: RowsFetcher + Send + Sync + 'static
         }
 
         let mut data = DataBlock::concat(&self.blocks)?;
+        self.blocks.clear();
+
         let num_rows = data.num_rows();
+        if num_rows == 0 {
+            return Ok(None);
+        }
 
         let entry = &data.columns()[self.row_id_col_offset];
         let value = entry

From 734f99247c4261ecf3a6c7f829215265ffb98411 Mon Sep 17 00:00:00 2001
From: sundyli <543950155@qq.com>
Date: Thu, 16 Jan 2025 21:24:20 +0800
Subject: [PATCH 3/4] fix(query): improve fuse row fetch, use accumulating

---
 src/query/storages/fuse/src/operations/read/fuse_rows_fetcher.rs | 1 -
 .../storages/fuse/src/operations/read/native_rows_fetcher.rs     | 1 -
 .../storages/fuse/src/operations/read/parquet_rows_fetcher.rs    | 1 -
 3 files changed, 3 deletions(-)

diff --git a/src/query/storages/fuse/src/operations/read/fuse_rows_fetcher.rs b/src/query/storages/fuse/src/operations/read/fuse_rows_fetcher.rs
index 85e239dc832d8..7fb3a05d37217 100644
--- a/src/query/storages/fuse/src/operations/read/fuse_rows_fetcher.rs
+++ b/src/query/storages/fuse/src/operations/read/fuse_rows_fetcher.rs
@@ -26,7 +26,6 @@ use databend_common_expression::types::NumberDataType;
 use databend_common_expression::BlockEntry;
 use databend_common_expression::Column;
 use databend_common_expression::DataBlock;
-use databend_common_expression::DataSchema;
 use databend_common_expression::Value;
 use databend_common_pipeline_core::processors::InputPort;
 use databend_common_pipeline_core::processors::OutputPort;
diff --git a/src/query/storages/fuse/src/operations/read/native_rows_fetcher.rs b/src/query/storages/fuse/src/operations/read/native_rows_fetcher.rs
index 7bd990296034d..ca45afc996279 100644
--- a/src/query/storages/fuse/src/operations/read/native_rows_fetcher.rs
+++ b/src/query/storages/fuse/src/operations/read/native_rows_fetcher.rs
@@ -26,7 +26,6 @@ use databend_common_catalog::plan::Projection;
 use databend_common_catalog::table::Table;
 use databend_common_exception::Result;
 use databend_common_expression::DataBlock;
-use databend_common_expression::DataSchema;
 use databend_common_expression::TableSchemaRef;
 use databend_common_storage::ColumnNodes;
 use databend_storages_common_cache::LoadParams;
diff --git a/src/query/storages/fuse/src/operations/read/parquet_rows_fetcher.rs b/src/query/storages/fuse/src/operations/read/parquet_rows_fetcher.rs
index 8666cf9ab93ec..35b8a47ed9aaf 100644
--- a/src/query/storages/fuse/src/operations/read/parquet_rows_fetcher.rs
+++ b/src/query/storages/fuse/src/operations/read/parquet_rows_fetcher.rs
@@ -26,7 +26,6 @@ use databend_common_catalog::table::Table;
 use databend_common_exception::ErrorCode;
 use databend_common_exception::Result;
 use databend_common_expression::DataBlock;
-use databend_common_expression::DataSchema;
 use databend_common_expression::TableSchemaRef;
 use databend_common_storage::ColumnNodes;
 use databend_storages_common_cache::LoadParams;

From 39b72150e864d2cd8be6f519afa12667232ad197 Mon Sep 17 00:00:00 2001
From: sundyli <543950155@qq.com>
Date: Fri, 17 Jan 2025 09:06:15 +0800
Subject: [PATCH 4/4] fix(query): add extra log

---
 .../fuse/src/operations/read/fuse_rows_fetcher.rs        | 9 +++++++++
 1 file changed, 9 insertions(+)

diff --git a/src/query/storages/fuse/src/operations/read/fuse_rows_fetcher.rs b/src/query/storages/fuse/src/operations/read/fuse_rows_fetcher.rs
index 7fb3a05d37217..e0ec3424b6161 100644
--- a/src/query/storages/fuse/src/operations/read/fuse_rows_fetcher.rs
+++ b/src/query/storages/fuse/src/operations/read/fuse_rows_fetcher.rs
@@ -161,6 +161,8 @@ where F: RowsFetcher + Send + Sync + 'static
             return Ok(None);
         }
 
+        let start_time = std::time::Instant::now();
+        let num_blocks = self.blocks.len();
         let mut data = DataBlock::concat(&self.blocks)?;
         self.blocks.clear();
 
@@ -192,6 +194,13 @@ where F: RowsFetcher + Send + Sync + 'static
             }
         }
 
+        log::info!(
+            "TransformRowsFetcher on_finish: num_rows: {}, input blocks: {} in {} milliseconds",
+            num_rows,
+            num_blocks,
+            start_time.elapsed().as_millis()
+        );
+
         Ok(Some(data))
     }
 }