Skip to content

Commit

Permalink
refactor(query): only call once finish callback (#17293)
Browse files Browse the repository at this point in the history
* fix(query): only call once finish callback

* fix(query): only call once finish callback

* fix(query): fix
  • Loading branch information
sundy-li authored Jan 16, 2025
1 parent 71f00b8 commit e9fc0a1
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 5 deletions.
12 changes: 7 additions & 5 deletions src/query/service/src/pipelines/pipeline_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,13 @@ impl PipelineBuilder {
}

// unload spill metas
self.main_pipeline
.set_on_finished(always_callback(move |_info: &ExecutionInfo| {
self.ctx.unload_spill_meta();
Ok(())
}));
if !self.ctx.mark_unload_callbacked() {
self.main_pipeline
.set_on_finished(always_callback(move |_info: &ExecutionInfo| {
self.ctx.unload_spill_meta();
Ok(())
}));
}

Ok(PipelineBuildResult {
main_pipeline: self.main_pipeline,
Expand Down
9 changes: 9 additions & 0 deletions src/query/service/src/sessions/query_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,9 @@ impl QueryContext {

pub fn update_init_query_id(&self, id: String) {
self.shared.spilled_files.write().clear();
self.shared
.unload_callbacked
.store(false, Ordering::Release);
self.shared.cluster_spill_progress.write().clear();
*self.shared.init_query_id.write() = id;
}
Expand Down Expand Up @@ -471,6 +474,12 @@ impl QueryContext {
Ok(table)
}

pub fn mark_unload_callbacked(&self) -> bool {
self.shared
.unload_callbacked
.fetch_or(true, Ordering::SeqCst)
}

pub fn unload_spill_meta(&self) {
const SPILL_META_SUFFIX: &str = ".list";
let r = self.shared.spilled_files.read();
Expand Down
2 changes: 2 additions & 0 deletions src/query/service/src/sessions/query_ctx_shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ pub struct QueryContextShared {
pub(in crate::sessions) cluster_spill_progress: Arc<RwLock<HashMap<String, SpillProgress>>>,
pub(in crate::sessions) spilled_files:
Arc<RwLock<HashMap<crate::spillers::Location, crate::spillers::Layout>>>,
pub(in crate::sessions) unload_callbacked: AtomicBool,
}

impl QueryContextShared {
Expand Down Expand Up @@ -209,6 +210,7 @@ impl QueryContextShared {

cluster_spill_progress: Default::default(),
spilled_files: Default::default(),
unload_callbacked: AtomicBool::new(false),
warehouse_cache: Arc::new(RwLock::new(None)),
}))
}
Expand Down

0 comments on commit e9fc0a1

Please sign in to comment.