Skip to content

Commit

Permalink
chore: try add distributed unit test
Browse files Browse the repository at this point in the history
  • Loading branch information
BohuTANG committed Dec 2, 2023
1 parent d5e217e commit 7395ad0
Show file tree
Hide file tree
Showing 7 changed files with 142 additions and 11 deletions.
12 changes: 5 additions & 7 deletions src/query/service/src/test_kits/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use crate::sessions::SessionManager;
use crate::sessions::SessionType;
use crate::sessions::TableContext;
use crate::test_kits::ConfigBuilder;
use crate::test_kits::TestFixture;
use crate::test_kits::TestGlobalServices;
use crate::test_kits::TestGuard;

Expand Down Expand Up @@ -131,6 +132,7 @@ pub async fn create_query_context_with_config_new(
Ok((guard, dummy_query_context, dummy_session))
}

#[derive(Clone)]
pub struct ClusterDescriptor {
local_node_id: String,
cluster_nodes_list: Vec<Arc<NodeInfo>>,
Expand Down Expand Up @@ -175,12 +177,8 @@ impl Default for ClusterDescriptor {
#[allow(dead_code)]
pub async fn create_query_context_with_cluster(
desc: ClusterDescriptor,
) -> Result<(TestGuard, Arc<QueryContext>)> {
let config = ConfigBuilder::create().build();
let guard = TestGlobalServices::setup(&config).await?;
let dummy_session = SessionManager::instance()
.create_session(SessionType::Dummy)
.await?;
) -> Result<Arc<QueryContext>> {
let dummy_session = TestFixture::create_session(SessionType::Dummy).await?;
let local_id = desc.local_node_id;
let nodes = desc.cluster_nodes_list;

Expand All @@ -190,5 +188,5 @@ pub async fn create_query_context_with_cluster(
)?);

dummy_query_context.get_settings().set_max_threads(8)?;
Ok((guard, dummy_query_context))
Ok(dummy_query_context)
}
2 changes: 1 addition & 1 deletion src/query/service/src/test_kits/table_test_fixture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ impl TestFixture {
Self::with_setup(OSSSetup { config }).await
}

async fn create_session(session_type: SessionType) -> Result<Arc<Session>> {
pub async fn create_session(session_type: SessionType) -> Result<Arc<Session>> {
let mut user_info = UserInfo::new("root", "%", AuthInfo::Password {
hash_method: PasswordHashMethod::Sha256,
hash_value: Vec::from("pass"),
Expand Down
118 changes: 118 additions & 0 deletions src/query/service/tests/it/distributed/cluster.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// Copyright 2021 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;
use std::sync::Barrier;
use std::thread;

use common_base::base::tokio;
use common_config::InnerConfig;
use common_exception::ErrorCode;
use common_exception::Result;
use common_expression::DataBlock;
use databend_query::api::RpcService;
use databend_query::test_kits::create_query_context_with_cluster;
use databend_query::test_kits::table_test_fixture::execute_query;
use databend_query::test_kits::ClusterDescriptor;
use databend_query::test_kits::ConfigBuilder;
use databend_query::test_kits::TestGlobalServices;
use futures_util::TryStreamExt;
use tokio::runtime::Builder as TokioRuntimeBuilder;

#[test]
fn test_init_singleton() -> Result<()> {
let configs = setup_node_configs(vec![
"0.0.0.0:6061", // Node 1 address
"0.0.0.0:6062", /* Node 2 address
* Add more addresses as needed */
]);
let task_count = configs.len();
let barrier = Arc::new(Barrier::new(task_count + 1));
let mut handles = Vec::with_capacity(task_count);

let cluster_desc = setup_cluster(&configs);

for (i, conf) in configs.into_iter().enumerate() {
let barrier_clone = barrier.clone();
let thread_name = format!("custom-thread-node-{}", i + 1);
let is_special_node = i == task_count - 1; // Make the last node the special one

let conf_clone = conf.clone(); // Clone the configuration as well
let cluster_desc_clone = cluster_desc.clone();

let handle = thread::Builder::new()
.name(thread_name)
.spawn(move || {
let rt = TokioRuntimeBuilder::new_current_thread()
.enable_all()
.build()
.expect("Failed to create runtime");

let inner_async = async move {
let _guard = TestGlobalServices::setup(&conf_clone).await?;

let mut srv = RpcService::create(conf_clone.clone())?;
srv.start(conf_clone.query.flight_api_address.parse()?)
.await?;

if is_special_node {
let ctx = create_query_context_with_cluster(cluster_desc_clone).await?;

let res = execute_query(ctx, "select * from system.clusters").await?;
let blocks = res.try_collect::<Vec<DataBlock>>().await?;
println!("blocks: {:?}", blocks);
}

barrier_clone.wait();
Ok::<(), ErrorCode>(())
};

if let Err(e) = rt.block_on(inner_async) {
eprintln!("Error in async block: {}", e);
}
})
.map_err(|e| ErrorCode::UnknownException(format!("Failed to spawn thread: {}", e)))?;

handles.push(handle);
}

barrier.wait();
for handle in handles {
handle.join().expect("Thread failed to complete");
}

Ok(())
}

fn setup_node_configs(addresses: Vec<&str>) -> Vec<InnerConfig> {
addresses
.into_iter()
.enumerate()
.map(|(i, address)| {
let mut conf = ConfigBuilder::create().build();
conf.query.flight_api_address = address.to_string();
conf.query.cluster_id = format!("node{}", i + 1);
conf
})
.collect()
}

fn setup_cluster(configs: &[InnerConfig]) -> ClusterDescriptor {
let mut cluster_desc = ClusterDescriptor::new();
for conf in configs.iter() {
cluster_desc =
cluster_desc.with_node(&conf.query.cluster_id, &conf.query.flight_api_address);
}
cluster_desc
}
15 changes: 15 additions & 0 deletions src/query/service/tests/it/distributed/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// Copyright 2021 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.W

mod cluster;
1 change: 1 addition & 0 deletions src/query/service/tests/it/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ mod catalogs;
mod clusters;
mod configs;
mod databases;
mod distributed;
mod frame;
mod interpreters;
mod metrics;
Expand Down
3 changes: 1 addition & 2 deletions src/query/service/tests/it/storages/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -410,8 +410,7 @@ async fn test_caches_table() -> Result<()> {
let file = &mut mint.new_goldenfile("caches_table.txt").unwrap();

let cluster_desc = ClusterDescriptor::new().with_local_id("test-node");
let (_guard, ctx) =
databend_query::test_kits::create_query_context_with_cluster(cluster_desc).await?;
let ctx = databend_query::test_kits::create_query_context_with_cluster(cluster_desc).await?;

let table = CachesTable::create(1);

Expand Down
2 changes: 1 addition & 1 deletion src/query/storages/system/src/log_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ impl<Event: SystemLogElement + 'static> SystemLogQueue<Event> {
Some(instance) => instance
.downcast_ref::<Arc<Self>>()
.cloned()
.ok_or(ErrorCode::Internal("")),
.ok_or(ErrorCode::Internal("SystemLogQueue instance get error")),
}
}
}
Expand Down

0 comments on commit 7395ad0

Please sign in to comment.