Skip to content

Commit

Permalink
fix(fuzz): ensure all regions leases are renewed (#5294)
Browse files Browse the repository at this point in the history
* fix(fuzz): ensure all regions leases are renewed

* fix: fix clippy
  • Loading branch information
WenyXu authored Jan 6, 2025
1 parent 2ad5033 commit b229c94
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 66 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions tests-fuzz/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ chrono = { workspace = true }
common-base = { workspace = true }
common-error = { workspace = true }
common-macro = { workspace = true }
common-meta = { workspace = true }
common-query = { workspace = true }
common-runtime = { workspace = true }
common-telemetry = { workspace = true }
Expand Down
5 changes: 5 additions & 0 deletions tests-fuzz/targets/failover/fuzz_failover_metric_regions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::sync::Arc;
use std::time::Duration;

use arbitrary::{Arbitrary, Unstructured};
use common_meta::distributed_time_constants;
use common_telemetry::info;
use libfuzzer_sys::fuzz_target;
use rand::{Rng, SeedableRng};
Expand Down Expand Up @@ -251,6 +252,10 @@ async fn execute_failover(ctx: FuzzContext, input: FuzzInput) -> Result<()> {
recover_pod_failure(ctx.kube.clone(), &ctx.namespace, &chaos_name).await?;
wait_for_all_datanode_online(ctx.greptime.clone(), Duration::from_secs(60)).await;

tokio::time::sleep(Duration::from_secs(
distributed_time_constants::REGION_LEASE_SECS,
))
.await;
// Validates value rows
info!("Validates num of rows");

Expand Down
5 changes: 5 additions & 0 deletions tests-fuzz/targets/failover/fuzz_failover_mito_regions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::sync::Arc;
use std::time::Duration;

use arbitrary::{Arbitrary, Unstructured};
use common_meta::distributed_time_constants;
use common_telemetry::info;
use common_time::util::current_time_millis;
use futures::future::try_join_all;
Expand Down Expand Up @@ -318,6 +319,10 @@ async fn execute_failover(ctx: FuzzContext, input: FuzzInput) -> Result<()> {
recover_pod_failure(ctx.kube.clone(), &ctx.namespace, &chaos_name).await?;
wait_for_all_datanode_online(ctx.greptime.clone(), Duration::from_secs(60)).await;

tokio::time::sleep(Duration::from_secs(
distributed_time_constants::REGION_LEASE_SECS,
))
.await;
// Validates value rows
info!("Validates num of rows");
for (table_ctx, expected_rows) in table_ctxs.iter().zip(affected_rows) {
Expand Down
116 changes: 50 additions & 66 deletions tests-fuzz/targets/migration/fuzz_migrate_metric_regions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::sync::Arc;
use std::time::Duration;

use arbitrary::{Arbitrary, Unstructured};
use common_meta::distributed_time_constants;
use common_telemetry::info;
use libfuzzer_sys::fuzz_target;
use rand::{Rng, SeedableRng};
Expand Down Expand Up @@ -252,6 +253,50 @@ async fn wait_for_migration(ctx: &FuzzContext, migration: &Migration, procedure_
.await;
}

async fn migrate_regions(ctx: &FuzzContext, migrations: &[Migration]) -> Result<()> {
let mut procedure_ids = Vec::with_capacity(migrations.len());
// Triggers region migrations
for Migration {
from_peer,
to_peer,
region_id,
} in migrations
{
let procedure_id =
migrate_region(&ctx.greptime, region_id.as_u64(), *from_peer, *to_peer, 120).await;
info!("Migrating region: {region_id} from {from_peer} to {to_peer}, procedure: {procedure_id}");
procedure_ids.push(procedure_id);
}
for (migration, procedure_id) in migrations.iter().zip(procedure_ids) {
wait_for_migration(ctx, migration, &procedure_id).await;
}

tokio::time::sleep(Duration::from_secs(
distributed_time_constants::REGION_LEASE_SECS,
))
.await;

Ok(())
}

async fn validate_rows(
ctx: &FuzzContext,
tables: &HashMap<Ident, (TableContextRef, InsertIntoExpr)>,
) -> Result<()> {
info!("Validates num of rows");
for (table_ctx, insert_expr) in tables.values() {
let sql = format!("select count(1) as count from {}", table_ctx.name);
let values = count_values(&ctx.greptime, &sql).await?;
let expected_rows = insert_expr.values_list.len() as u64;
assert_eq!(
values.count as u64, expected_rows,
"Expected rows: {}, got: {}, table: {}",
expected_rows, values.count, table_ctx.name
);
}
Ok(())
}

async fn execute_migration(ctx: FuzzContext, input: FuzzInput) -> Result<()> {
let mut rng = ChaCha20Rng::seed_from_u64(input.seed);
// Creates a physical table.
Expand Down Expand Up @@ -305,36 +350,11 @@ async fn execute_migration(ctx: FuzzContext, input: FuzzInput) -> Result<()> {
}
}

let mut procedure_ids = Vec::with_capacity(migrations.len());
// Triggers region migrations
for Migration {
from_peer,
to_peer,
region_id,
} in &migrations
{
let procedure_id =
migrate_region(&ctx.greptime, region_id.as_u64(), *from_peer, *to_peer, 120).await;
info!("Migrating region: {region_id} from {from_peer} to {to_peer}, procedure: {procedure_id}");
procedure_ids.push(procedure_id);
}
info!("Excepted new region distribution: {new_distribution:?}");
for (migration, procedure_id) in migrations.clone().into_iter().zip(procedure_ids) {
wait_for_migration(&ctx, &migration, &procedure_id).await;
}
migrate_regions(&ctx, &migrations).await?;

// Validates value rows
info!("Validates num of rows");
for (table_ctx, insert_expr) in tables.values() {
let sql = format!("select count(1) as count from {}", table_ctx.name);
let values = count_values(&ctx.greptime, &sql).await?;
let expected_rows = insert_expr.values_list.len() as u64;
assert_eq!(
values.count as u64, expected_rows,
"Expected rows: {}, got: {}, table: {}",
expected_rows, values.count, table_ctx.name
);
}
validate_rows(&ctx, &tables).await?;

// Creates more logical tables and inserts values
create_logical_table_and_insert_values(
Expand All @@ -348,17 +368,7 @@ async fn execute_migration(ctx: FuzzContext, input: FuzzInput) -> Result<()> {
.await?;

// Validates value rows
info!("Validates num of rows");
for (table_ctx, insert_expr) in tables.values() {
let sql = format!("select count(1) as count from {}", table_ctx.name);
let values = count_values(&ctx.greptime, &sql).await?;
let expected_rows = insert_expr.values_list.len() as u64;
assert_eq!(
values.count as u64, expected_rows,
"Expected rows: {}, got: {}, table: {}",
expected_rows, values.count, table_ctx.name
);
}
validate_rows(&ctx, &tables).await?;

// Recovers region distribution
let migrations = migrations
Expand All @@ -376,23 +386,7 @@ async fn execute_migration(ctx: FuzzContext, input: FuzzInput) -> Result<()> {
)
.collect::<Vec<_>>();

let mut procedure_ids = Vec::with_capacity(migrations.len());
// Triggers region migrations
for Migration {
from_peer,
to_peer,
region_id,
} in &migrations
{
let procedure_id =
migrate_region(&ctx.greptime, region_id.as_u64(), *from_peer, *to_peer, 120).await;
info!("Migrating region: {region_id} from {from_peer} to {to_peer}, procedure: {procedure_id}");
procedure_ids.push(procedure_id);
}
info!("Excepted new region distribution: {new_distribution:?}");
for (migration, procedure_id) in migrations.clone().into_iter().zip(procedure_ids) {
wait_for_migration(&ctx, &migration, &procedure_id).await;
}
migrate_regions(&ctx, &migrations).await?;

// Creates more logical tables and inserts values
create_logical_table_and_insert_values(
Expand All @@ -406,17 +400,7 @@ async fn execute_migration(ctx: FuzzContext, input: FuzzInput) -> Result<()> {
.await?;

// Validates value rows
info!("Validates num of rows");
for (table_ctx, insert_expr) in tables.values() {
let sql = format!("select count(1) as count from {}", table_ctx.name);
let values = count_values(&ctx.greptime, &sql).await?;
let expected_rows = insert_expr.values_list.len() as u64;
assert_eq!(
values.count as u64, expected_rows,
"Expected rows: {}, got: {}, table: {}",
expected_rows, values.count, table_ctx.name
);
}
validate_rows(&ctx, &tables).await?;

// Clean up
for (table_ctx, _) in tables.values() {
Expand Down
6 changes: 6 additions & 0 deletions tests-fuzz/targets/migration/fuzz_migrate_mito_regions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::sync::Arc;
use std::time::Duration;

use arbitrary::{Arbitrary, Unstructured};
use common_meta::distributed_time_constants;
use common_telemetry::info;
use libfuzzer_sys::fuzz_target;
use rand::{Rng, SeedableRng};
Expand Down Expand Up @@ -261,6 +262,11 @@ async fn migrate_regions(ctx: &FuzzContext, migrations: &[Migration]) -> Result<
.await;
}

tokio::time::sleep(Duration::from_secs(
distributed_time_constants::REGION_LEASE_SECS,
))
.await;

Ok(())
}

Expand Down

0 comments on commit b229c94

Please sign in to comment.