Skip to content

Commit

Permalink
chore: add enable_prune_cache setting
Browse files Browse the repository at this point in the history
  • Loading branch information
dqhl76 committed Jan 16, 2025
1 parent e987ff8 commit 35a8dd6
Showing 1 changed file with 26 additions and 23 deletions.
49 changes: 26 additions & 23 deletions src/query/storages/fuse/src/operations/read_partitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,34 +206,37 @@ impl FuseTable {
)
});

if let Some((_stat, part)) = Self::check_prune_cache(&derterministic_cache_key) {
let sender = part_info_tx.clone();
info!("prune pipeline: get prune result from cache");
source_pipeline.set_on_init(move || {
// We cannot use the runtime associated with the query to avoid increasing its lifetime.
GlobalIORuntime::instance().spawn(async move {
// avoid block global io runtime
let runtime = Runtime::with_worker_threads(2, Some("send-parts".to_string()))?;

let join_handler = runtime.spawn(async move {
for part in part.partitions {
// the sql may be killed or early stop, ignore the error
if let Err(_e) = sender.send(Ok(part)).await {
break;
if ctx.get_settings().get_enable_prune_cache()? {
if let Some((_stat, part)) = Self::check_prune_cache(&derterministic_cache_key) {
let sender = part_info_tx.clone();
info!("prune pipeline: get prune result from cache");
source_pipeline.set_on_init(move || {
// We cannot use the runtime associated with the query to avoid increasing its lifetime.
GlobalIORuntime::instance().spawn(async move {
// avoid block global io runtime
let runtime =
Runtime::with_worker_threads(2, Some("send-parts".to_string()))?;

let join_handler = runtime.spawn(async move {
for part in part.partitions {
// the sql may be killed or early stop, ignore the error
if let Err(_e) = sender.send(Ok(part)).await {
break;
}
}
});

if let Err(cause) = join_handler.await {
log::warn!("Join error while in prune pipeline, cause: {:?}", cause);
}
});

if let Err(cause) = join_handler.await {
log::warn!("Join error while in prune pipeline, cause: {:?}", cause);
}
Result::Ok(())
});

Result::Ok(())
Ok(())
});

Ok(())
});
return Ok(None);
return Ok(None);
}
}

let mut prune_pipeline = Pipeline::create();
Expand Down

0 comments on commit 35a8dd6

Please sign in to comment.