Skip to content

Commit

Permalink
change the log queue to global singleton instance
Browse files Browse the repository at this point in the history
  • Loading branch information
BohuTANG committed Dec 2, 2023
1 parent 7395ad0 commit 234bb00
Showing 1 changed file with 3 additions and 25 deletions.
28 changes: 3 additions & 25 deletions src/query/storages/system/src/log_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,17 @@
// limitations under the License.

use std::any::Any;
use std::any::TypeId;
use std::collections::HashMap;
use std::marker::PhantomData;
use std::sync::Arc;

use common_base::base::GlobalInstance;
use common_catalog::plan::DataSourcePlan;
use common_catalog::plan::PartStatistics;
use common_catalog::plan::Partitions;
use common_catalog::plan::PartitionsShuffleKind;
use common_catalog::plan::PushDownInfo;
use common_catalog::table::Table;
use common_catalog::table_context::TableContext;
use common_exception::ErrorCode;
use common_exception::Result;
use common_expression::types::DataType;
use common_expression::ColumnBuilder;
Expand All @@ -39,7 +37,6 @@ use common_pipeline_core::processors::ProcessorPtr;
use common_pipeline_core::Pipeline;
use common_pipeline_sources::SyncSource;
use common_pipeline_sources::SyncSourcer;
use once_cell::sync::OnceCell;
use parking_lot::RwLock;

use crate::table::SystemTablePart;
Expand Down Expand Up @@ -71,32 +68,13 @@ pub struct SystemLogQueue<Event: SystemLogElement> {
data: Arc<RwLock<Data<Event>>>,
}

static INSTANCES_MAP: OnceCell<RwLock<HashMap<TypeId, Box<dyn Any + 'static + Send + Sync>>>> =
OnceCell::new();

impl<Event: SystemLogElement + 'static> SystemLogQueue<Event> {
pub fn init(max_rows: usize) {
let instance: Box<dyn Any + 'static + Send + Sync> = Box::new(Self::create(max_rows));

let instances_map = INSTANCES_MAP.get_or_init(move || RwLock::new(HashMap::new()));
let mut write_guard = instances_map.write();
write_guard.insert(TypeId::of::<Self>(), instance);
GlobalInstance::set(Arc::new(Self::create(max_rows)));
}

pub fn instance() -> Result<Arc<SystemLogQueue<Event>>> {
unsafe {
match INSTANCES_MAP
.get_unchecked()
.read()
.get(&TypeId::of::<Self>())
{
None => Err(ErrorCode::Internal("")),
Some(instance) => instance
.downcast_ref::<Arc<Self>>()
.cloned()
.ok_or(ErrorCode::Internal("SystemLogQueue instance get error")),
}
}
GlobalInstance::get()
}

pub fn create(max_rows: usize) -> Arc<SystemLogQueue<Event>> {
Expand Down

0 comments on commit 234bb00

Please sign in to comment.