Skip to content

Commit

Permalink
chore(query): improve fuse row fetch, use accumulating (#17309)
Browse files Browse the repository at this point in the history
* fix(query): improve fuse row fetch, use accumulating

* fix(query): improve fuse row fetch, use accumulating

* fix(query): improve fuse row fetch, use accumulating

* fix(query): add extra log
  • Loading branch information
sundy-li authored Jan 17, 2025
1 parent e6e3aec commit 37d96b2
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ use databend_common_pipeline_core::processors::Processor;
pub trait AsyncAccumulatingTransform: Send {
const NAME: &'static str;

async fn on_start(&mut self) -> Result<()> {
Ok(())
}

async fn transform(&mut self, data: DataBlock) -> Result<Option<DataBlock>>;

async fn on_finish(&mut self, _output: bool) -> Result<Option<DataBlock>> {
Expand All @@ -38,6 +42,7 @@ pub struct AsyncAccumulatingTransformer<T: AsyncAccumulatingTransform + 'static>
input: Arc<InputPort>,
output: Arc<OutputPort>,

called_on_start: bool,
called_on_finish: bool,
input_data: Option<DataBlock>,
output_data: Option<DataBlock>,
Expand All @@ -51,6 +56,7 @@ impl<T: AsyncAccumulatingTransform + 'static> AsyncAccumulatingTransformer<T> {
output,
input_data: None,
output_data: None,
called_on_start: false,
called_on_finish: false,
})
}
Expand All @@ -67,6 +73,10 @@ impl<T: AsyncAccumulatingTransform + 'static> Processor for AsyncAccumulatingTra
}

fn event(&mut self) -> Result<Event> {
if !self.called_on_start {
return Ok(Event::Async);
}

if self.output.is_finished() {
if !self.called_on_finish {
return Ok(Event::Async);
Expand Down Expand Up @@ -111,6 +121,12 @@ impl<T: AsyncAccumulatingTransform + 'static> Processor for AsyncAccumulatingTra

#[async_backtrace::framed]
async fn async_process(&mut self) -> Result<()> {
if !self.called_on_start {
self.called_on_start = true;
self.inner.on_start().await?;
return Ok(());
}

if let Some(data_block) = self.input_data.take() {
self.output_data = self.inner.transform(data_block).await?;
return Ok(());
Expand Down
47 changes: 30 additions & 17 deletions src/query/storages/fuse/src/operations/read/fuse_rows_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,13 @@ use databend_common_expression::types::DataType;
use databend_common_expression::types::NumberDataType;
use databend_common_expression::BlockEntry;
use databend_common_expression::Column;
use databend_common_expression::ColumnBuilder;
use databend_common_expression::DataBlock;
use databend_common_expression::DataSchema;
use databend_common_expression::Value;
use databend_common_pipeline_core::processors::InputPort;
use databend_common_pipeline_core::processors::OutputPort;
use databend_common_pipeline_core::processors::ProcessorPtr;
use databend_common_pipeline_transforms::processors::AsyncTransform;
use databend_common_pipeline_transforms::processors::AsyncTransformer;
use databend_common_pipeline_transforms::AsyncAccumulatingTransform;
use databend_common_pipeline_transforms::AsyncAccumulatingTransformer;
use databend_storages_common_io::ReadSettings;

use super::native_rows_fetcher::NativeRowsFetcher;
Expand Down Expand Up @@ -132,17 +130,17 @@ pub fn row_fetch_processor(
pub trait RowsFetcher {
async fn on_start(&mut self) -> Result<()>;
async fn fetch(&mut self, row_ids: &[u64]) -> Result<DataBlock>;
fn schema(&self) -> DataSchema;
}

pub struct TransformRowsFetcher<F: RowsFetcher> {
row_id_col_offset: usize,
fetcher: F,
need_wrap_nullable: bool,
blocks: Vec<DataBlock>,
}

#[async_trait::async_trait]
impl<F> AsyncTransform for TransformRowsFetcher<F>
impl<F> AsyncAccumulatingTransform for TransformRowsFetcher<F>
where F: RowsFetcher + Send + Sync + 'static
{
const NAME: &'static str = "TransformRowsFetcher";
Expand All @@ -152,18 +150,25 @@ where F: RowsFetcher + Send + Sync + 'static
self.fetcher.on_start().await
}

async fn transform(&mut self, data: DataBlock) -> Result<Option<DataBlock>> {
self.blocks.push(data);
Ok(None)
}

#[async_backtrace::framed]
async fn transform(&mut self, mut data: DataBlock) -> Result<DataBlock> {
async fn on_finish(&mut self, _output: bool) -> Result<Option<DataBlock>> {
if self.blocks.is_empty() {
return Ok(None);
}

let start_time = std::time::Instant::now();
let num_blocks = self.blocks.len();
let mut data = DataBlock::concat(&self.blocks)?;
self.blocks.clear();

let num_rows = data.num_rows();
if num_rows == 0 {
// Although the data block is empty, we need to add empty columns to align the schema.
let fetched_schema = self.fetcher.schema();
for f in fetched_schema.fields().iter() {
let builder = ColumnBuilder::with_capacity(f.data_type(), 0);
let col = builder.build();
data.add_column(BlockEntry::new(f.data_type().clone(), Value::Column(col)));
}
return Ok(data);
return Ok(None);
}

let entry = &data.columns()[self.row_id_col_offset];
Expand All @@ -189,7 +194,14 @@ where F: RowsFetcher + Send + Sync + 'static
}
}

Ok(data)
log::info!(
"TransformRowsFetcher on_finish: num_rows: {}, input blocks: {} in {} milliseconds",
num_rows,
num_blocks,
start_time.elapsed().as_millis()
);

Ok(Some(data))
}
}

Expand All @@ -203,10 +215,11 @@ where F: RowsFetcher + Send + Sync + 'static
fetcher: F,
need_wrap_nullable: bool,
) -> ProcessorPtr {
ProcessorPtr::create(AsyncTransformer::create(input, output, Self {
ProcessorPtr::create(AsyncAccumulatingTransformer::create(input, output, Self {
row_id_col_offset,
fetcher,
need_wrap_nullable,
blocks: vec![],
}))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ use databend_common_catalog::plan::Projection;
use databend_common_catalog::table::Table;
use databend_common_exception::Result;
use databend_common_expression::DataBlock;
use databend_common_expression::DataSchema;
use databend_common_expression::TableSchemaRef;
use databend_common_storage::ColumnNodes;
use databend_storages_common_cache::LoadParams;
Expand Down Expand Up @@ -148,10 +147,6 @@ impl<const BLOCKING_IO: bool> RowsFetcher for NativeRowsFetcher<BLOCKING_IO> {

Ok(DataBlock::take_blocks(&blocks, &indices, num_rows))
}

fn schema(&self) -> DataSchema {
self.reader.data_schema()
}
}

impl<const BLOCKING_IO: bool> NativeRowsFetcher<BLOCKING_IO> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ use databend_common_catalog::table::Table;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_expression::DataBlock;
use databend_common_expression::DataSchema;
use databend_common_expression::TableSchemaRef;
use databend_common_storage::ColumnNodes;
use databend_storages_common_cache::LoadParams;
Expand Down Expand Up @@ -158,10 +157,6 @@ impl<const BLOCKING_IO: bool> RowsFetcher for ParquetRowsFetcher<BLOCKING_IO> {

Ok(DataBlock::take_blocks(&blocks, &indices, num_rows))
}

fn schema(&self) -> DataSchema {
self.reader.data_schema()
}
}

impl<const BLOCKING_IO: bool> ParquetRowsFetcher<BLOCKING_IO> {
Expand Down

0 comments on commit 37d96b2

Please sign in to comment.