From 234bb00ce491af189633257457e2be59ea383703 Mon Sep 17 00:00:00 2001 From: BohuTANG Date: Sat, 2 Dec 2023 13:50:29 +0800 Subject: [PATCH] change the log queue to global singleton instance --- src/query/storages/system/src/log_queue.rs | 28 +++------------------- 1 file changed, 3 insertions(+), 25 deletions(-) diff --git a/src/query/storages/system/src/log_queue.rs b/src/query/storages/system/src/log_queue.rs index 67c3869d2583e..432c70a17c3bf 100644 --- a/src/query/storages/system/src/log_queue.rs +++ b/src/query/storages/system/src/log_queue.rs @@ -13,11 +13,10 @@ // limitations under the License. use std::any::Any; -use std::any::TypeId; -use std::collections::HashMap; use std::marker::PhantomData; use std::sync::Arc; +use common_base::base::GlobalInstance; use common_catalog::plan::DataSourcePlan; use common_catalog::plan::PartStatistics; use common_catalog::plan::Partitions; @@ -25,7 +24,6 @@ use common_catalog::plan::PartitionsShuffleKind; use common_catalog::plan::PushDownInfo; use common_catalog::table::Table; use common_catalog::table_context::TableContext; -use common_exception::ErrorCode; use common_exception::Result; use common_expression::types::DataType; use common_expression::ColumnBuilder; @@ -39,7 +37,6 @@ use common_pipeline_core::processors::ProcessorPtr; use common_pipeline_core::Pipeline; use common_pipeline_sources::SyncSource; use common_pipeline_sources::SyncSourcer; -use once_cell::sync::OnceCell; use parking_lot::RwLock; use crate::table::SystemTablePart; @@ -71,32 +68,13 @@ pub struct SystemLogQueue { data: Arc>>, } -static INSTANCES_MAP: OnceCell>>> = - OnceCell::new(); - impl SystemLogQueue { pub fn init(max_rows: usize) { - let instance: Box = Box::new(Self::create(max_rows)); - - let instances_map = INSTANCES_MAP.get_or_init(move || RwLock::new(HashMap::new())); - let mut write_guard = instances_map.write(); - write_guard.insert(TypeId::of::(), instance); + GlobalInstance::set(Arc::new(Self::create(max_rows))); } pub fn instance() -> Result>> { - unsafe { - match INSTANCES_MAP - .get_unchecked() - .read() - .get(&TypeId::of::()) - { - None => Err(ErrorCode::Internal("")), - Some(instance) => instance - .downcast_ref::>() - .cloned() - .ok_or(ErrorCode::Internal("SystemLogQueue instance get error")), - } - } + GlobalInstance::get() } pub fn create(max_rows: usize) -> Arc> {