From 35a8dd60cac02c512f5cff95f8832ae95bb13d20 Mon Sep 17 00:00:00 2001 From: Liuqing Yue Date: Thu, 16 Jan 2025 21:14:35 +0800 Subject: [PATCH] chore: add enable_prune_cache setting --- .../fuse/src/operations/read_partitions.rs | 49 ++++++++++--------- 1 file changed, 26 insertions(+), 23 deletions(-) diff --git a/src/query/storages/fuse/src/operations/read_partitions.rs b/src/query/storages/fuse/src/operations/read_partitions.rs index e93383d58ee3..aeaeb7fac99d 100644 --- a/src/query/storages/fuse/src/operations/read_partitions.rs +++ b/src/query/storages/fuse/src/operations/read_partitions.rs @@ -206,34 +206,37 @@ impl FuseTable { ) }); - if let Some((_stat, part)) = Self::check_prune_cache(&derterministic_cache_key) { - let sender = part_info_tx.clone(); - info!("prune pipeline: get prune result from cache"); - source_pipeline.set_on_init(move || { - // We cannot use the runtime associated with the query to avoid increasing its lifetime. - GlobalIORuntime::instance().spawn(async move { - // avoid block global io runtime - let runtime = Runtime::with_worker_threads(2, Some("send-parts".to_string()))?; - - let join_handler = runtime.spawn(async move { - for part in part.partitions { - // the sql may be killed or early stop, ignore the error - if let Err(_e) = sender.send(Ok(part)).await { - break; + if ctx.get_settings().get_enable_prune_cache()? { + if let Some((_stat, part)) = Self::check_prune_cache(&derterministic_cache_key) { + let sender = part_info_tx.clone(); + info!("prune pipeline: get prune result from cache"); + source_pipeline.set_on_init(move || { + // We cannot use the runtime associated with the query to avoid increasing its lifetime. + GlobalIORuntime::instance().spawn(async move { + // avoid block global io runtime + let runtime = + Runtime::with_worker_threads(2, Some("send-parts".to_string()))?; + + let join_handler = runtime.spawn(async move { + for part in part.partitions { + // the sql may be killed or early stop, ignore the error + if let Err(_e) = sender.send(Ok(part)).await { + break; + } } + }); + + if let Err(cause) = join_handler.await { + log::warn!("Join error while in prune pipeline, cause: {:?}", cause); } - }); - if let Err(cause) = join_handler.await { - log::warn!("Join error while in prune pipeline, cause: {:?}", cause); - } + Result::Ok(()) + }); - Result::Ok(()) + Ok(()) }); - - Ok(()) - }); - return Ok(None); + return Ok(None); + } } let mut prune_pipeline = Pipeline::create();