Skip to content

Commit

Permalink
refactor: integrate segment pruning into pipeline (#17126)
Browse files Browse the repository at this point in the history
* refactor: integrate segment pruning into pipeline

* chore: fix unit test and clean code
  • Loading branch information
dqhl76 authored Dec 28, 2024
1 parent ef5c8bf commit 7392cdb
Show file tree
Hide file tree
Showing 7 changed files with 168 additions and 79 deletions.
13 changes: 5 additions & 8 deletions src/query/service/tests/it/storages/fuse/pruning_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ use databend_storages_common_table_meta::table::OPT_KEY_DATABASE_ID;
use databend_storages_common_table_meta::table::OPT_KEY_SNAPSHOT_LOCATION;
use opendal::Operator;

async fn apply_block_pruning(
async fn apply_snapshot_pruning(
table_snapshot: Arc<TableSnapshot>,
schema: TableSchemaRef,
push_down: &Option<PushDownInfo>,
Expand Down Expand Up @@ -102,12 +102,9 @@ async fn apply_block_pruning(
GlobalIORuntime::instance().spawn(async move {
// avoid block global io runtime
let runtime = Runtime::with_worker_threads(2, None)?;

let join_handler = runtime.spawn(async move {
let segment_pruned_result =
fuse_pruner.clone().segment_pruning(segment_locs).await?;
for segment in segment_pruned_result {
let _ = segment_tx.send(Ok(segment)).await;
for segment in segment_locs {
let _ = segment_tx.send(segment).await;
}
Ok::<_, ErrorCode>(())
});
Expand Down Expand Up @@ -140,7 +137,7 @@ async fn apply_block_pruning(
}

#[tokio::test(flavor = "multi_thread")]
async fn test_block_pruner() -> Result<()> {
async fn test_snapshot_pruner() -> Result<()> {
let fixture = TestFixture::setup().await?;
let ctx = fixture.new_query_ctx().await?;

Expand Down Expand Up @@ -320,7 +317,7 @@ async fn test_block_pruner() -> Result<()> {

for (id, (extra, expected_blocks, expected_rows)) in extras.into_iter().enumerate() {
let cache_key = Some(format!("test_block_pruner_{}", id));
let parts = apply_block_pruning(
let parts = apply_snapshot_pruning(
snapshot.clone(),
table.get_table_info().schema(),
&extra,
Expand Down
29 changes: 20 additions & 9 deletions src/query/storages/fuse/src/operations/read_partitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ use databend_storages_common_pruner::BlockMetaIndex;
use databend_storages_common_pruner::TopNPrunner;
use databend_storages_common_table_meta::meta::BlockMeta;
use databend_storages_common_table_meta::meta::ColumnStatistics;
use databend_storages_common_table_meta::meta::CompactSegmentInfo;
use databend_storages_common_table_meta::table::ChangeType;
use log::info;
use opendal::Operator;
Expand All @@ -62,10 +61,12 @@ use crate::pruning::table_sample;
use crate::pruning::BlockPruner;
use crate::pruning::FusePruner;
use crate::pruning::SegmentLocation;
use crate::pruning::SegmentPruner;
use crate::pruning_pipeline::AsyncBlockPruneTransform;
use crate::pruning_pipeline::ExtractSegmentTransform;
use crate::pruning_pipeline::PrunedSegmentReceiverSource;
use crate::pruning_pipeline::LazySegmentReceiverSource;
use crate::pruning_pipeline::SampleBlockMetasTransform;
use crate::pruning_pipeline::SegmentPruneTransform;
use crate::pruning_pipeline::SendPartInfoSink;
use crate::pruning_pipeline::SendPartState;
use crate::pruning_pipeline::SyncBlockPruneTransform;
Expand Down Expand Up @@ -241,13 +242,11 @@ impl FuseTable {
// We cannot use the runtime associated with the query to avoid increasing its lifetime.
GlobalIORuntime::instance().spawn(async move {
// avoid block global io runtime
let runtime = Runtime::with_worker_threads(2, Some("prune-seg".to_string()))?;
let runtime = Runtime::with_worker_threads(2, Some("prune-pipeline".to_string()))?;
let join_handler = runtime.spawn(async move {
let segment_pruned_result =
pruner.clone().segment_pruning(lazy_init_segments).await?;
for segment in segment_pruned_result {
for segment in lazy_init_segments {
// the sql may be killed or early stop, ignore the error
if let Err(_e) = segment_tx.send(Ok(segment)).await {
if let Err(_e) = segment_tx.send(segment).await {
break;
}
}
Expand Down Expand Up @@ -341,15 +340,27 @@ impl FuseTable {
pruner: Arc<FusePruner>,
prune_pipeline: &mut Pipeline,
ctx: Arc<dyn TableContext>,
segment_rx: Receiver<Result<(SegmentLocation, Arc<CompactSegmentInfo>)>>,
segment_rx: Receiver<SegmentLocation>,
part_info_tx: Sender<Result<PartInfoPtr>>,
derterministic_cache_key: Option<String>,
) -> Result<()> {
let max_threads = ctx.get_settings().get_max_threads()? as usize;
prune_pipeline.add_source(
|output| PrunedSegmentReceiverSource::create(ctx.clone(), segment_rx.clone(), output),
|output| LazySegmentReceiverSource::create(ctx.clone(), segment_rx.clone(), output),
max_threads,
)?;
let segment_pruner =
SegmentPruner::create(pruner.pruning_ctx.clone(), pruner.table_schema.clone())?;

prune_pipeline.add_transform(|input, output| {
SegmentPruneTransform::create(
input,
output,
segment_pruner.clone(),
pruner.pruning_ctx.clone(),
)
})?;

prune_pipeline
.add_transform(|input, output| ExtractSegmentTransform::create(input, output, true))?;
let sample_probability = table_sample(&pruner.push_down)?;
Expand Down
47 changes: 0 additions & 47 deletions src/query/storages/fuse/src/pruning/fuse_pruner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,53 +443,6 @@ impl FusePruner {
}
}

// Temporarily using, will remove after finish pruning refactor.
pub async fn segment_pruning(
&self,
mut segment_locs: Vec<SegmentLocation>,
) -> Result<Vec<(SegmentLocation, Arc<CompactSegmentInfo>)>> {
// Segment pruner.
let segment_pruner =
SegmentPruner::create(self.pruning_ctx.clone(), self.table_schema.clone())?;

let mut remain = segment_locs.len() % self.max_concurrency;
let batch_size = segment_locs.len() / self.max_concurrency;
let mut works = Vec::with_capacity(self.max_concurrency);
while !segment_locs.is_empty() {
let gap_size = std::cmp::min(1, remain);
let batch_size = batch_size + gap_size;
remain -= gap_size;

let mut batch = segment_locs.drain(0..batch_size).collect::<Vec<_>>();
works.push(self.pruning_ctx.pruning_runtime.spawn({
let segment_pruner = segment_pruner.clone();
let pruning_ctx = self.pruning_ctx.clone();
async move {
// Build pruning tasks.
if let Some(internal_column_pruner) = &pruning_ctx.internal_column_pruner {
batch = batch
.into_iter()
.filter(|segment| {
internal_column_pruner
.should_keep(SEGMENT_NAME_COL_NAME, &segment.location.0)
})
.collect::<Vec<_>>();
}
let pruned_segments = segment_pruner.pruning(batch).await?;
Result::<_>::Ok(pruned_segments)
}
}));
}

let workers = futures::future::try_join_all(works).await?;
let mut pruned_segments = vec![];
for worker in workers {
let res = worker?;
pruned_segments.extend(res);
}
Ok(pruned_segments)
}

fn extract_block_metas(
segment_path: &str,
segment: &CompactSegmentInfo,
Expand Down
43 changes: 43 additions & 0 deletions src/query/storages/fuse/src/pruning_pipeline/lazy_segment_meta.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::fmt::Debug;
use std::fmt::Formatter;

use databend_common_expression::local_block_meta_serde;
use databend_common_expression::BlockMetaInfo;
use databend_common_expression::BlockMetaInfoPtr;

use crate::SegmentLocation;

pub struct LazySegmentMeta {
pub segment_location: SegmentLocation,
}

impl LazySegmentMeta {
pub fn create(segment_location: SegmentLocation) -> BlockMetaInfoPtr {
Box::new(LazySegmentMeta { segment_location })
}
}

impl Debug for LazySegmentMeta {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("LazySegmentMeta").finish()
}
}

local_block_meta_serde!(LazySegmentMeta);

#[typetag::serde(name = "lazy_segment_meta")]
impl BlockMetaInfo for LazySegmentMeta {}
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,18 @@ use databend_common_pipeline_core::processors::OutputPort;
use databend_common_pipeline_core::processors::ProcessorPtr;
use databend_common_pipeline_sources::AsyncSource;
use databend_common_pipeline_sources::AsyncSourcer;
use databend_storages_common_table_meta::meta::CompactSegmentInfo;

use crate::pruning_pipeline::pruned_segment_meta::PrunedSegmentMeta;
use crate::pruning_pipeline::LazySegmentMeta;
use crate::SegmentLocation;

pub struct PrunedSegmentReceiverSource {
pub meta_receiver: Receiver<Result<(SegmentLocation, Arc<CompactSegmentInfo>)>>,
pub struct LazySegmentReceiverSource {
pub meta_receiver: Receiver<SegmentLocation>,
}

impl PrunedSegmentReceiverSource {
impl LazySegmentReceiverSource {
pub fn create(
ctx: Arc<dyn TableContext>,
receiver: Receiver<Result<(SegmentLocation, Arc<CompactSegmentInfo>)>>,
receiver: Receiver<SegmentLocation>,
output_port: Arc<OutputPort>,
) -> Result<ProcessorPtr> {
AsyncSourcer::create(ctx, output_port, Self {
Expand All @@ -44,20 +43,16 @@ impl PrunedSegmentReceiverSource {
}

#[async_trait::async_trait]
impl AsyncSource for PrunedSegmentReceiverSource {
const NAME: &'static str = "PrunedSegmentReceiverSource";
impl AsyncSource for LazySegmentReceiverSource {
const NAME: &'static str = "LazySegmentReceiverSource";
const SKIP_EMPTY_DATA_BLOCK: bool = false;

#[async_backtrace::framed]
async fn generate(&mut self) -> Result<Option<DataBlock>> {
match self.meta_receiver.recv().await {
Ok(Ok(segments)) => Ok(Some(DataBlock::empty_with_meta(PrunedSegmentMeta::create(
Ok(segments) => Ok(Some(DataBlock::empty_with_meta(LazySegmentMeta::create(
segments,
)))),
Ok(Err(e)) => Err(
// The error is occurred in pruning process
e,
),
Err(_) => {
// The channel is closed, we should return None to stop generating
Ok(None)
Expand Down
8 changes: 6 additions & 2 deletions src/query/storages/fuse/src/pruning_pipeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,21 @@ mod async_block_prune_transform;
mod block_metas_meta;
mod block_prune_result_meta;
mod extract_segment_transform;
mod lazy_segment_meta;
mod lazy_segment_receiver_source;
mod pruned_segment_meta;
mod pruned_segment_receiver_source;
mod sample_block_metas_transform;
mod segment_prune_transform;
mod send_part_info_sink;
mod sync_block_prune_transform;
mod topn_prune_transform;

pub use async_block_prune_transform::AsyncBlockPruneTransform;
pub use extract_segment_transform::ExtractSegmentTransform;
pub use pruned_segment_receiver_source::PrunedSegmentReceiverSource;
pub use lazy_segment_meta::LazySegmentMeta;
pub use lazy_segment_receiver_source::LazySegmentReceiverSource;
pub use sample_block_metas_transform::SampleBlockMetasTransform;
pub use segment_prune_transform::SegmentPruneTransform;
pub use send_part_info_sink::SendPartInfoSink;
pub use send_part_info_sink::SendPartState;
pub use sync_block_prune_transform::SyncBlockPruneTransform;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_expression::BlockMetaInfoDowncast;
use databend_common_expression::DataBlock;
use databend_common_expression::SEGMENT_NAME_COL_NAME;
use databend_common_pipeline_core::processors::InputPort;
use databend_common_pipeline_core::processors::OutputPort;
use databend_common_pipeline_core::processors::ProcessorPtr;
use databend_common_pipeline_transforms::AsyncAccumulatingTransform;
use databend_common_pipeline_transforms::AsyncAccumulatingTransformer;

use crate::pruning::PruningContext;
use crate::pruning::SegmentPruner;
use crate::pruning_pipeline::pruned_segment_meta::PrunedSegmentMeta;
use crate::pruning_pipeline::LazySegmentMeta;

pub struct SegmentPruneTransform {
pub segment_pruner: Arc<SegmentPruner>,
pub pruning_ctx: Arc<PruningContext>,
}

impl SegmentPruneTransform {
pub fn create(
input: Arc<InputPort>,
output: Arc<OutputPort>,
segment_pruner: Arc<SegmentPruner>,
pruning_context: Arc<PruningContext>,
) -> Result<ProcessorPtr> {
Ok(ProcessorPtr::create(AsyncAccumulatingTransformer::create(
input,
output,
SegmentPruneTransform {
segment_pruner,
pruning_ctx: pruning_context,
},
)))
}
}

#[async_trait::async_trait]
impl AsyncAccumulatingTransform for SegmentPruneTransform {
const NAME: &'static str = "SegmentPruneTransform";

async fn transform(&mut self, mut data: DataBlock) -> Result<Option<DataBlock>> {
if let Some(ptr) = data.take_meta() {
if let Some(meta) = LazySegmentMeta::downcast_from(ptr) {
let location = meta.segment_location;
if let Some(pruner) = &self.pruning_ctx.internal_column_pruner {
if !pruner.should_keep(SEGMENT_NAME_COL_NAME, &location.location.0) {
return Ok(None);
}
}
let mut pruned_segments = self.segment_pruner.pruning(vec![location]).await?;

if pruned_segments.is_empty() {
return Ok(None);
}

debug_assert!(pruned_segments.len() == 1);

return Ok(Some(DataBlock::empty_with_meta(PrunedSegmentMeta::create(
pruned_segments.pop().unwrap(),
))));
}
}
Err(ErrorCode::Internal(
"Cannot downcast meta to LazySegmentMeta",
))
}
}

0 comments on commit 7392cdb

Please sign in to comment.