Skip to content

Commit

Permalink
make cluster nodes show work
Browse files Browse the repository at this point in the history
  • Loading branch information
BohuTANG committed Dec 4, 2023
1 parent 4b19a3a commit eaa9301
Showing 1 changed file with 30 additions and 14 deletions.
44 changes: 30 additions & 14 deletions src/query/service/tests/it/distributed/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;
use std::sync::Barrier;
use std::thread;

use common_base::base::tokio;
Expand All @@ -31,21 +29,19 @@ fn test_simple_cluster() -> Result<()> {
let configs = setup_node_configs(vec![
"0.0.0.0:6061", // Node 1 flight address
"0.0.0.0:6062", // Node 2 flight address
"0.0.0.0:6063", // Node 2 flight address
"0.0.0.0:6063", // Node 3 flight address
"0.0.0.0:6064", // Node 4 flight address
"0.0.0.0:6065", // Node 5 flight address
]);

let task_count = configs.len();
let barrier = Arc::new(Barrier::new(task_count + 1));
let mut handles = Vec::with_capacity(task_count);

let cluster_desc = setup_cluster(&configs);

for (i, conf) in configs.into_iter().enumerate() {
let barrier_clone = barrier.clone();
let thread_name = format!("custom-thread-node-{}", i + 1);
let is_special_node = i == task_count - 1; // Make the last node the special one
let is_check_node = i == task_count - 1; // Make the last node the special one

let conf_clone = conf.clone(); // Clone the configuration as well
let cluster_desc_clone = cluster_desc.clone();
Expand All @@ -65,17 +61,38 @@ fn test_simple_cluster() -> Result<()> {
srv.start(conf_clone.query.flight_api_address.parse()?)
.await?;

if is_special_node {
if is_check_node {
// Create the ctx with cluster nodes.
let ctx = fixture
.new_query_ctx_with_cluster(cluster_desc_clone)
.await?;

let res = execute_query(ctx, "select * from system.clusters").await?;
let blocks = res.try_collect::<Vec<DataBlock>>().await?;
common_expression::block_debug::pretty_format_blocks(&blocks).unwrap();
// Check the cluster table.
{
let res = execute_query(
ctx.clone(),
"select name, host, port from system.clusters",
)
.await?;
let blocks = res.try_collect::<Vec<DataBlock>>().await?;
let expected = vec![
"+----------+-----------+----------+",
"| Column 0 | Column 1 | Column 2 |",
"+----------+-----------+----------+",
"| 'node1' | '0.0.0.0' | 6061 |",
"| 'node2' | '0.0.0.0' | 6062 |",
"| 'node3' | '0.0.0.0' | 6063 |",
"| 'node4' | '0.0.0.0' | 6064 |",
"| 'node5' | '0.0.0.0' | 6065 |",
"+----------+-----------+----------+",
];
common_expression::block_debug::assert_blocks_sorted_eq(
expected,
blocks.as_slice(),
);
}
}

barrier_clone.wait();
Ok::<(), ErrorCode>(())
};

Expand All @@ -88,15 +105,14 @@ fn test_simple_cluster() -> Result<()> {
handles.push(handle);
}

barrier.wait();
for handle in handles {
handle.join().expect("Thread failed to complete");
}

Ok(())
}

/// Setup the configurations for the nodes in the cluster
/// Setup the configurations for the nodes in the cluster.
fn setup_node_configs(addresses: Vec<&str>) -> Vec<InnerConfig> {
addresses
.into_iter()
Expand All @@ -110,7 +126,7 @@ fn setup_node_configs(addresses: Vec<&str>) -> Vec<InnerConfig> {
.collect()
}

/// Setup the cluster descriptor for the nodes in the cluster
/// Setup the cluster descriptor for the nodes in the cluster.
fn setup_cluster(configs: &[InnerConfig]) -> ClusterDescriptor {
let mut cluster_desc = ClusterDescriptor::new();
for conf in configs.iter() {
Expand Down

0 comments on commit eaa9301

Please sign in to comment.