Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(query): improve fuse row fetch, use accumulating #17309

Merged
merged 4 commits into from
Jan 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ use databend_common_pipeline_core::processors::Processor;
pub trait AsyncAccumulatingTransform: Send {
const NAME: &'static str;

async fn on_start(&mut self) -> Result<()> {
Ok(())
}

async fn transform(&mut self, data: DataBlock) -> Result<Option<DataBlock>>;

async fn on_finish(&mut self, _output: bool) -> Result<Option<DataBlock>> {
Expand All @@ -38,6 +42,7 @@ pub struct AsyncAccumulatingTransformer<T: AsyncAccumulatingTransform + 'static>
input: Arc<InputPort>,
output: Arc<OutputPort>,

called_on_start: bool,
called_on_finish: bool,
input_data: Option<DataBlock>,
output_data: Option<DataBlock>,
Expand All @@ -51,6 +56,7 @@ impl<T: AsyncAccumulatingTransform + 'static> AsyncAccumulatingTransformer<T> {
output,
input_data: None,
output_data: None,
called_on_start: false,
called_on_finish: false,
})
}
Expand All @@ -67,6 +73,10 @@ impl<T: AsyncAccumulatingTransform + 'static> Processor for AsyncAccumulatingTra
}

fn event(&mut self) -> Result<Event> {
if !self.called_on_start {
return Ok(Event::Async);
}

if self.output.is_finished() {
if !self.called_on_finish {
return Ok(Event::Async);
Expand Down Expand Up @@ -111,6 +121,12 @@ impl<T: AsyncAccumulatingTransform + 'static> Processor for AsyncAccumulatingTra

#[async_backtrace::framed]
async fn async_process(&mut self) -> Result<()> {
if !self.called_on_start {
self.called_on_start = true;
self.inner.on_start().await?;
return Ok(());
}

if let Some(data_block) = self.input_data.take() {
self.output_data = self.inner.transform(data_block).await?;
return Ok(());
Expand Down
47 changes: 30 additions & 17 deletions src/query/storages/fuse/src/operations/read/fuse_rows_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,13 @@ use databend_common_expression::types::DataType;
use databend_common_expression::types::NumberDataType;
use databend_common_expression::BlockEntry;
use databend_common_expression::Column;
use databend_common_expression::ColumnBuilder;
use databend_common_expression::DataBlock;
use databend_common_expression::DataSchema;
use databend_common_expression::Value;
use databend_common_pipeline_core::processors::InputPort;
use databend_common_pipeline_core::processors::OutputPort;
use databend_common_pipeline_core::processors::ProcessorPtr;
use databend_common_pipeline_transforms::processors::AsyncTransform;
use databend_common_pipeline_transforms::processors::AsyncTransformer;
use databend_common_pipeline_transforms::AsyncAccumulatingTransform;
use databend_common_pipeline_transforms::AsyncAccumulatingTransformer;
use databend_storages_common_io::ReadSettings;

use super::native_rows_fetcher::NativeRowsFetcher;
Expand Down Expand Up @@ -132,17 +130,17 @@ pub fn row_fetch_processor(
pub trait RowsFetcher {
async fn on_start(&mut self) -> Result<()>;
async fn fetch(&mut self, row_ids: &[u64]) -> Result<DataBlock>;
fn schema(&self) -> DataSchema;
}

pub struct TransformRowsFetcher<F: RowsFetcher> {
row_id_col_offset: usize,
fetcher: F,
need_wrap_nullable: bool,
blocks: Vec<DataBlock>,
}

#[async_trait::async_trait]
impl<F> AsyncTransform for TransformRowsFetcher<F>
impl<F> AsyncAccumulatingTransform for TransformRowsFetcher<F>
where F: RowsFetcher + Send + Sync + 'static
{
const NAME: &'static str = "TransformRowsFetcher";
Expand All @@ -152,18 +150,25 @@ where F: RowsFetcher + Send + Sync + 'static
self.fetcher.on_start().await
}

async fn transform(&mut self, data: DataBlock) -> Result<Option<DataBlock>> {
self.blocks.push(data);
Ok(None)
}

#[async_backtrace::framed]
async fn transform(&mut self, mut data: DataBlock) -> Result<DataBlock> {
async fn on_finish(&mut self, _output: bool) -> Result<Option<DataBlock>> {
if self.blocks.is_empty() {
return Ok(None);
}

let start_time = std::time::Instant::now();
let num_blocks = self.blocks.len();
let mut data = DataBlock::concat(&self.blocks)?;
self.blocks.clear();

let num_rows = data.num_rows();
if num_rows == 0 {
// Although the data block is empty, we need to add empty columns to align the schema.
let fetched_schema = self.fetcher.schema();
for f in fetched_schema.fields().iter() {
let builder = ColumnBuilder::with_capacity(f.data_type(), 0);
let col = builder.build();
data.add_column(BlockEntry::new(f.data_type().clone(), Value::Column(col)));
}
return Ok(data);
return Ok(None);
}

let entry = &data.columns()[self.row_id_col_offset];
Expand All @@ -189,7 +194,14 @@ where F: RowsFetcher + Send + Sync + 'static
}
}

Ok(data)
log::info!(
"TransformRowsFetcher on_finish: num_rows: {}, input blocks: {} in {} milliseconds",
num_rows,
num_blocks,
start_time.elapsed().as_millis()
);

Ok(Some(data))
}
}

Expand All @@ -203,10 +215,11 @@ where F: RowsFetcher + Send + Sync + 'static
fetcher: F,
need_wrap_nullable: bool,
) -> ProcessorPtr {
ProcessorPtr::create(AsyncTransformer::create(input, output, Self {
ProcessorPtr::create(AsyncAccumulatingTransformer::create(input, output, Self {
row_id_col_offset,
fetcher,
need_wrap_nullable,
blocks: vec![],
}))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ use databend_common_catalog::plan::Projection;
use databend_common_catalog::table::Table;
use databend_common_exception::Result;
use databend_common_expression::DataBlock;
use databend_common_expression::DataSchema;
use databend_common_expression::TableSchemaRef;
use databend_common_storage::ColumnNodes;
use databend_storages_common_cache::LoadParams;
Expand Down Expand Up @@ -148,10 +147,6 @@ impl<const BLOCKING_IO: bool> RowsFetcher for NativeRowsFetcher<BLOCKING_IO> {

Ok(DataBlock::take_blocks(&blocks, &indices, num_rows))
}

fn schema(&self) -> DataSchema {
self.reader.data_schema()
}
}

impl<const BLOCKING_IO: bool> NativeRowsFetcher<BLOCKING_IO> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ use databend_common_catalog::table::Table;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_expression::DataBlock;
use databend_common_expression::DataSchema;
use databend_common_expression::TableSchemaRef;
use databend_common_storage::ColumnNodes;
use databend_storages_common_cache::LoadParams;
Expand Down Expand Up @@ -158,10 +157,6 @@ impl<const BLOCKING_IO: bool> RowsFetcher for ParquetRowsFetcher<BLOCKING_IO> {

Ok(DataBlock::take_blocks(&blocks, &indices, num_rows))
}

fn schema(&self) -> DataSchema {
self.reader.data_schema()
}
}

impl<const BLOCKING_IO: bool> ParquetRowsFetcher<BLOCKING_IO> {
Expand Down
Loading