diff --git a/src/query/ee/tests/it/aggregating_index/index_refresh.rs b/src/query/ee/tests/it/aggregating_index/index_refresh.rs index 59faef9786670..952c506451ff8 100644 --- a/src/query/ee/tests/it/aggregating_index/index_refresh.rs +++ b/src/query/ee/tests/it/aggregating_index/index_refresh.rs @@ -38,6 +38,9 @@ use databend_query::test_kits::TestFixture; use enterprise_query::test_kits::context::create_ee_query_context; use futures_util::TryStreamExt; +use crate::aggregating_index::CATALOG; +use crate::aggregating_index::DATABASE; + async fn plan_sql(ctx: Arc, sql: &str) -> Result { let mut planner = Planner::new(ctx); let (plan, _) = planner.plan_sql(sql).await?; @@ -113,12 +116,19 @@ async fn test_refresh_agg_index() -> Result<()> { let fixture = TestFixture::new_with_ctx(_guard, ctx).await; // Create table + fixture + .ctx() + .evict_table_from_cache(CATALOG, DATABASE, "t0")?; execute_sql( fixture.ctx(), "CREATE TABLE t0 (a int, b int, c int) storage_format = 'parquet'", ) .await?; + fixture + .ctx() + .evict_table_from_cache(CATALOG, DATABASE, "t0")?; + // Insert data execute_sql( fixture.ctx(), @@ -129,6 +139,10 @@ async fn test_refresh_agg_index() -> Result<()> { // Create index let index_name = "index0"; + fixture + .ctx() + .evict_table_from_cache(CATALOG, DATABASE, "t0")?; + let index_id = create_index( fixture.ctx(), index_name, @@ -138,6 +152,9 @@ async fn test_refresh_agg_index() -> Result<()> { .await?; // Refresh Index + fixture + .ctx() + .evict_table_from_cache(CATALOG, DATABASE, index_name)?; refresh_index(fixture.ctx(), index_name, None).await?; let block_path = find_block_path(&root)?.unwrap(); @@ -156,6 +173,9 @@ async fn test_refresh_agg_index() -> Result<()> { // Check aggregating index is correct. { + fixture + .ctx() + .evict_table_from_cache(CATALOG, DATABASE, "t0")?; let res = execute_sql( fixture.ctx(), "SELECT b, SUM_STATE(a) from t0 WHERE c > 1 GROUP BY b", @@ -163,6 +183,9 @@ async fn test_refresh_agg_index() -> Result<()> { .await?; let data_blocks: Vec = res.try_collect().await?; + fixture + .ctx() + .evict_table_from_cache(CATALOG, DATABASE, "t0")?; let agg_res = execute_sql( fixture.ctx(), &format!( @@ -177,6 +200,9 @@ async fn test_refresh_agg_index() -> Result<()> { } // Insert more data + fixture + .ctx() + .evict_table_from_cache(CATALOG, DATABASE, "t0")?; execute_sql( fixture.ctx(), "INSERT INTO t0 VALUES (1,1,4), (1,2,1), (1,2,4), (2,2,5)", @@ -188,6 +214,9 @@ async fn test_refresh_agg_index() -> Result<()> { assert!(blocks.len() > indexes.len()); // Refresh Index again + fixture + .ctx() + .evict_table_from_cache(CATALOG, DATABASE, "t0")?; refresh_index(fixture.ctx(), index_name, None).await?; // check the new added index is correct. @@ -206,6 +235,9 @@ async fn test_refresh_agg_index() -> Result<()> { indexes[0].clone() }; + fixture + .ctx() + .evict_table_from_cache(CATALOG, DATABASE, "t0")?; let data_blocks: Vec = execute_sql( fixture.ctx(), &format!( @@ -217,6 +249,9 @@ async fn test_refresh_agg_index() -> Result<()> { .try_collect() .await?; + fixture + .ctx() + .evict_table_from_cache(CATALOG, DATABASE, "t0")?; let agg_data_blocks: Vec = execute_sql( fixture.ctx(), &format!( @@ -228,6 +263,9 @@ async fn test_refresh_agg_index() -> Result<()> { .try_collect() .await?; + fixture + .ctx() + .evict_table_from_cache(CATALOG, DATABASE, "t0")?; assert_two_blocks_sorted_eq_with_name( "refresh index again", &data_blocks, @@ -259,7 +297,9 @@ async fn test_refresh_agg_index_with_limit() -> Result<()> { // Create index let index_name = "index1"; - + fixture + .ctx() + .evict_table_from_cache(CATALOG, DATABASE, "t1")?; let index_id = create_index( fixture.ctx(), index_name, @@ -269,12 +309,18 @@ async fn test_refresh_agg_index_with_limit() -> Result<()> { .await?; // Insert more data + fixture + .ctx() + .evict_table_from_cache(CATALOG, DATABASE, "t1")?; execute_sql( fixture.ctx(), "INSERT INTO t1 VALUES (1,1,4), (1,2,1), (1,2,4), (2,2,5)", ) .await?; + fixture + .ctx() + .evict_table_from_cache(CATALOG, DATABASE, "t1")?; execute_sql( fixture.ctx(), "INSERT INTO t1 VALUES (1,1,4), (1,2,1), (1,2,4), (2,2,5)", @@ -282,6 +328,9 @@ async fn test_refresh_agg_index_with_limit() -> Result<()> { .await?; // Refresh index with limit 1 + fixture + .ctx() + .evict_table_from_cache(CATALOG, DATABASE, "t1")?; refresh_index(fixture.ctx(), index_name, Some(1)).await?; let block_path = find_block_path(&root)?.unwrap(); @@ -339,6 +388,9 @@ async fn test_sync_agg_index_after_update() -> Result<()> { .await?; // Insert data + fixture + .ctx() + .evict_table_from_cache(CATALOG, DATABASE, "t0")?; execute_sql( ctx.clone(), "INSERT INTO t0 VALUES (1,1,4), (1,2,1), (1,2,4), (2,2,5)", @@ -356,6 +408,9 @@ async fn test_sync_agg_index_after_update() -> Result<()> { // Check aggregating index_0 is correct. { + fixture + .ctx() + .evict_table_from_cache(CATALOG, DATABASE, "t0")?; let res = execute_sql( ctx.clone(), "SELECT b, SUM_STATE(a) from t0 WHERE c > 1 GROUP BY b", @@ -381,6 +436,9 @@ async fn test_sync_agg_index_after_update() -> Result<()> { } // Update + fixture + .ctx() + .evict_table_from_cache(CATALOG, DATABASE, "t0")?; execute_sql(ctx.clone(), "UPDATE t0 SET c = 2 WHERE b = 2").await?; let first_block = blocks[0].clone(); @@ -394,6 +452,9 @@ async fn test_sync_agg_index_after_update() -> Result<()> { // Check aggregating index_0 is correct after update. { + fixture + .ctx() + .evict_table_from_cache(CATALOG, DATABASE, "t0")?; let updated_block = blocks .iter() .find(|b| !b.eq_ignore_ascii_case(&first_block)) @@ -468,6 +529,9 @@ async fn test_sync_agg_index_after_insert() -> Result<()> { .await?; // Insert data + fixture + .ctx() + .evict_table_from_cache(CATALOG, DATABASE, "t0")?; execute_sql( ctx.clone(), "INSERT INTO t0 VALUES (1,1,4), (1,2,1), (1,2,4), (2,2,5)", @@ -489,6 +553,9 @@ async fn test_sync_agg_index_after_insert() -> Result<()> { // Check aggregating index_0 is correct. { + fixture + .ctx() + .evict_table_from_cache(CATALOG, DATABASE, "t0")?; let res = execute_sql( ctx.clone(), "SELECT b, SUM_STATE(a) from t0 WHERE c > 1 GROUP BY b", @@ -515,6 +582,9 @@ async fn test_sync_agg_index_after_insert() -> Result<()> { // Check aggregating index_1 is correct. { + fixture + .ctx() + .evict_table_from_cache(CATALOG, DATABASE, "t0")?; let res = execute_sql( ctx.clone(), "SELECT a, SUM_STATE(b) from t0 WHERE c > 1 GROUP BY a", @@ -540,6 +610,9 @@ async fn test_sync_agg_index_after_insert() -> Result<()> { } // Insert more data with insert into ... select ... + fixture + .ctx() + .evict_table_from_cache(CATALOG, DATABASE, "t0")?; execute_sql(ctx.clone(), "INSERT INTO t0 SELECT * FROM t0").await?; let blocks = collect_file_names(&block_path)?; @@ -581,6 +654,9 @@ async fn test_sync_agg_index_after_copy_into() -> Result<()> { .await?; // Copy into data + fixture + .ctx() + .evict_table_from_cache(CATALOG, DATABASE, "t0")?; execute_sql( ctx.clone(), "COPY INTO books FROM 'https://datafuse-1253727613.cos.ap-hongkong.myqcloud.com/data/books.csv' FILE_FORMAT = (TYPE = CSV);", @@ -598,6 +674,9 @@ async fn test_sync_agg_index_after_copy_into() -> Result<()> { // Check aggregating index_0 is correct. { + fixture + .ctx() + .evict_table_from_cache(CATALOG, DATABASE, "t0")?; let res = execute_sql(ctx.clone(), "SELECT MAX_STATE(title) from books").await?; let data_blocks: Vec = res.try_collect().await?; diff --git a/src/query/ee/tests/it/aggregating_index/index_scan.rs b/src/query/ee/tests/it/aggregating_index/index_scan.rs index 43e1cca22a3bb..610c85c7d6aee 100644 --- a/src/query/ee/tests/it/aggregating_index/index_scan.rs +++ b/src/query/ee/tests/it/aggregating_index/index_scan.rs @@ -34,6 +34,9 @@ use databend_query::test_kits::TestFixture; use enterprise_query::test_kits::context::create_ee_query_context; use futures_util::TryStreamExt; +use crate::aggregating_index::CATALOG; +use crate::aggregating_index::DATABASE; + #[tokio::test(flavor = "multi_thread")] async fn test_index_scan() -> Result<()> { test_index_scan_impl("parquet").await?; @@ -112,6 +115,9 @@ async fn test_index_scan_impl(format: &str) -> Result<()> { .await?; // Insert data + fixture + .ctx() + .evict_table_from_cache(CATALOG, DATABASE, "t")?; execute_sql( fixture.ctx(), "INSERT INTO t VALUES (1,1,4), (1,2,1), (1,2,4), (2,2,5)", @@ -119,6 +125,9 @@ async fn test_index_scan_impl(format: &str) -> Result<()> { .await?; // Create index + fixture + .ctx() + .evict_table_from_cache(CATALOG, DATABASE, "t")?; let index_name = "index1"; execute_sql( @@ -128,6 +137,9 @@ async fn test_index_scan_impl(format: &str) -> Result<()> { .await?; // Refresh Index + fixture + .ctx() + .evict_table_from_cache(CATALOG, DATABASE, "t")?; execute_sql( fixture.ctx(), &format!("REFRESH AGGREGATING INDEX {index_name}"), @@ -135,6 +147,9 @@ async fn test_index_scan_impl(format: &str) -> Result<()> { .await?; // Query with index + fixture + .ctx() + .evict_table_from_cache(CATALOG, DATABASE, "t")?; let plan = plan_sql( fixture.ctx(), "SELECT b, SUM(a) from t WHERE c > 1 GROUP BY b", @@ -175,6 +190,9 @@ async fn test_index_scan_impl(format: &str) -> Result<()> { ) .await?; + fixture + .ctx() + .evict_table_from_cache(CATALOG, DATABASE, "t")?; let plan = plan_sql(fixture.ctx(), "SELECT SUM(a) from t WHERE c > 1 GROUP BY b").await?; assert!(is_index_scan_plan(&plan)); @@ -194,6 +212,9 @@ async fn test_index_scan_impl(format: &str) -> Result<()> { .await?; // Insert new data but not refresh index + fixture + .ctx() + .evict_table_from_cache(CATALOG, DATABASE, "t")?; execute_sql( fixture.ctx(), "INSERT INTO t VALUES (1,1,4), (1,2,1), (1,2,4), (2,2,5)", @@ -201,6 +222,9 @@ async fn test_index_scan_impl(format: &str) -> Result<()> { .await?; // Query with one fuse block and one index block + fixture + .ctx() + .evict_table_from_cache(CATALOG, DATABASE, "t")?; let plan = plan_sql( fixture.ctx(), "SELECT b, SUM(a) from t WHERE c > 1 GROUP BY b", @@ -223,6 +247,9 @@ async fn test_index_scan_impl(format: &str) -> Result<()> { ) .await?; + fixture + .ctx() + .evict_table_from_cache(CATALOG, DATABASE, "t")?; let plan = plan_sql(fixture.ctx(), "SELECT b + 1 from t WHERE c > 1 GROUP BY b").await?; assert!(is_index_scan_plan(&plan)); @@ -241,6 +268,9 @@ async fn test_index_scan_impl(format: &str) -> Result<()> { ) .await?; + fixture + .ctx() + .evict_table_from_cache(CATALOG, DATABASE, "t")?; let plan = plan_sql( fixture.ctx(), "SELECT SUM(a) + 1 from t WHERE c > 1 GROUP BY b", @@ -280,6 +310,9 @@ async fn test_index_scan_two_agg_funcs_impl(format: &str) -> Result<()> { .await?; // Insert data + fixture + .ctx() + .evict_table_from_cache(CATALOG, DATABASE, "t")?; execute_sql( fixture.ctx(), "INSERT INTO t VALUES (1,1,4), (1,2,1), (1,2,4), (2,2,5)", @@ -289,6 +322,9 @@ async fn test_index_scan_two_agg_funcs_impl(format: &str) -> Result<()> { // Create index let index_name = "index1"; + fixture + .ctx() + .evict_table_from_cache(CATALOG, DATABASE, "t")?; execute_sql( fixture.ctx(), &format!("CREATE AGGREGATING INDEX {index_name} AS SELECT b, MAX(a), SUM(a) from t WHERE c > 1 GROUP BY b"), @@ -296,6 +332,9 @@ async fn test_index_scan_two_agg_funcs_impl(format: &str) -> Result<()> { .await?; // Refresh Index + fixture + .ctx() + .evict_table_from_cache(CATALOG, DATABASE, "t")?; execute_sql( fixture.ctx(), &format!("REFRESH AGGREGATING INDEX {index_name}"), @@ -304,6 +343,9 @@ async fn test_index_scan_two_agg_funcs_impl(format: &str) -> Result<()> { // Query with index // sum + fixture + .ctx() + .evict_table_from_cache(CATALOG, DATABASE, "t")?; let plan = plan_sql( fixture.ctx(), "SELECT b, SUM(a) from t WHERE c > 1 GROUP BY b", @@ -327,6 +369,9 @@ async fn test_index_scan_two_agg_funcs_impl(format: &str) -> Result<()> { .await?; // sum and max + fixture + .ctx() + .evict_table_from_cache(CATALOG, DATABASE, "t")?; let plan = plan_sql( fixture.ctx(), "SELECT b, SUM(a), MAX(a) from t WHERE c > 1 GROUP BY b", @@ -350,6 +395,9 @@ async fn test_index_scan_two_agg_funcs_impl(format: &str) -> Result<()> { .await?; // Insert new data but not refresh index + fixture + .ctx() + .evict_table_from_cache(CATALOG, DATABASE, "t")?; execute_sql( fixture.ctx(), "INSERT INTO t VALUES (1,1,4), (1,2,1), (1,2,4), (2,2,5)", @@ -358,6 +406,9 @@ async fn test_index_scan_two_agg_funcs_impl(format: &str) -> Result<()> { // Query with one fuse block and one index block // sum + fixture + .ctx() + .evict_table_from_cache(CATALOG, DATABASE, "t")?; let plan = plan_sql( fixture.ctx(), "SELECT b, SUM(a) from t WHERE c > 1 GROUP BY b", @@ -381,6 +432,9 @@ async fn test_index_scan_two_agg_funcs_impl(format: &str) -> Result<()> { .await?; // sum and max + fixture + .ctx() + .evict_table_from_cache(CATALOG, DATABASE, "t")?; let plan = plan_sql( fixture.ctx(), "SELECT b, SUM(a), MAX(a) from t WHERE c > 1 GROUP BY b", @@ -428,7 +482,9 @@ async fn test_projected_index_scan_impl(format: &str) -> Result<()> { // Create index let index_name = "index1"; - + fixture + .ctx() + .evict_table_from_cache(CATALOG, DATABASE, "t")?; execute_sql( fixture.ctx(), &format!("CREATE AGGREGATING INDEX {index_name} AS SELECT b, MAX(a), SUM(a) from t WHERE c > 1 GROUP BY b"), @@ -436,6 +492,9 @@ async fn test_projected_index_scan_impl(format: &str) -> Result<()> { .await?; // Refresh Index + fixture + .ctx() + .evict_table_from_cache(CATALOG, DATABASE, "t")?; execute_sql( fixture.ctx(), &format!("REFRESH AGGREGATING INDEX {index_name}"), @@ -444,6 +503,9 @@ async fn test_projected_index_scan_impl(format: &str) -> Result<()> { // Query with index // sum + fixture + .ctx() + .evict_table_from_cache(CATALOG, DATABASE, "t")?; let plan = plan_sql(fixture.ctx(), "SELECT b from t WHERE c > 1 GROUP BY b").await?; assert!(is_index_scan_plan(&plan)); @@ -463,6 +525,9 @@ async fn test_projected_index_scan_impl(format: &str) -> Result<()> { .await?; // sum and max + fixture + .ctx() + .evict_table_from_cache(CATALOG, DATABASE, "t")?; let plan = plan_sql( fixture.ctx(), "SELECT b, SUM(a) from t WHERE c > 1 GROUP BY b", @@ -494,6 +559,9 @@ async fn test_projected_index_scan_impl(format: &str) -> Result<()> { // Query with one fuse block and one index block // sum + fixture + .ctx() + .evict_table_from_cache(CATALOG, DATABASE, "t")?; let plan = plan_sql( fixture.ctx(), "SELECT b, SUM(a) from t WHERE c > 1 GROUP BY b", @@ -517,6 +585,9 @@ async fn test_projected_index_scan_impl(format: &str) -> Result<()> { .await?; // sum and max + fixture + .ctx() + .evict_table_from_cache(CATALOG, DATABASE, "t")?; let plan = plan_sql( fixture.ctx(), "SELECT b, SUM(a) from t WHERE c > 1 GROUP BY b", @@ -568,6 +639,9 @@ async fn test_index_scan_with_count_impl(format: &str) -> Result<()> { .await?; // Refresh Index + fixture + .ctx() + .evict_table_from_cache(CATALOG, DATABASE, "t")?; execute_sql( fixture.ctx(), &format!("REFRESH AGGREGATING INDEX {index_name}"), @@ -575,6 +649,9 @@ async fn test_index_scan_with_count_impl(format: &str) -> Result<()> { .await?; // Query with index + fixture + .ctx() + .evict_table_from_cache(CATALOG, DATABASE, "t")?; let plan = plan_sql(fixture.ctx(), "SELECT a, COUNT(*) from t GROUP BY a").await?; assert!(is_index_scan_plan(&plan)); @@ -614,7 +691,9 @@ async fn test_index_scan_agg_args_are_expression_impl(format: &str) -> Result<() // Create index let index_name = "index1"; - + fixture + .ctx() + .evict_table_from_cache(CATALOG, DATABASE, "t")?; execute_sql( fixture.ctx(), &format!("CREATE AGGREGATING INDEX {index_name} AS SELECT SUBSTRING(a, 1, 1) as s, sum(length(a)), min(a) from t GROUP BY s"), @@ -622,6 +701,9 @@ async fn test_index_scan_agg_args_are_expression_impl(format: &str) -> Result<() .await?; // Refresh Index + fixture + .ctx() + .evict_table_from_cache(CATALOG, DATABASE, "t")?; execute_sql( fixture.ctx(), &format!("REFRESH AGGREGATING INDEX {index_name}"), @@ -629,6 +711,9 @@ async fn test_index_scan_agg_args_are_expression_impl(format: &str) -> Result<() .await?; // Query with index + fixture + .ctx() + .evict_table_from_cache(CATALOG, DATABASE, "t")?; let plan = plan_sql( fixture.ctx(), "SELECT SUBSTRING(a, 1, 1) as s, sum(length(a)), min(a) from t GROUP BY s", diff --git a/src/query/ee/tests/it/aggregating_index/mod.rs b/src/query/ee/tests/it/aggregating_index/mod.rs index 0f92076bb915d..c2a1a3086c8e9 100644 --- a/src/query/ee/tests/it/aggregating_index/mod.rs +++ b/src/query/ee/tests/it/aggregating_index/mod.rs @@ -14,3 +14,6 @@ mod index_refresh; mod index_scan; + +const CATALOG: &str = "default"; +const DATABASE: &str = "default"; diff --git a/src/query/ee/tests/it/storages/fuse/operations/computed_columns.rs b/src/query/ee/tests/it/storages/fuse/operations/computed_columns.rs index 0175e94262d54..8531e27d9f275 100644 --- a/src/query/ee/tests/it/storages/fuse/operations/computed_columns.rs +++ b/src/query/ee/tests/it/storages/fuse/operations/computed_columns.rs @@ -25,6 +25,7 @@ async fn test_computed_column() -> Result<()> { let (_guard, ctx, _) = create_ee_query_context(None).await.unwrap(); let fixture = TestFixture::new_with_ctx(_guard, ctx).await; + let catalog = fixture.default_catalog_name(); let db = fixture.default_db_name(); let tbl = fixture.default_table_name(); let ctx = fixture.ctx(); @@ -66,6 +67,7 @@ async fn test_computed_column() -> Result<()> { // update { + ctx.evict_table_from_cache(&catalog, &db, &tbl)?; let update = format!("update {}.{} set c = 'abc', d = 100 where id = 0", db, tbl); let _res = execute_query(ctx.clone(), &update).await?; @@ -82,6 +84,7 @@ async fn test_computed_column() -> Result<()> { "| 5 | '2-1-s' | 'S-1-2' | 14 | 39 | 's-1-2' | 12 |", "+----------+----------+----------+----------+----------+----------+----------+", ]; + ctx.evict_table_from_cache(&catalog, &db, &tbl)?; expects_ok( "check update computed columns", execute_query(ctx.clone(), query.as_str()).await, @@ -89,9 +92,11 @@ async fn test_computed_column() -> Result<()> { ) .await?; + ctx.evict_table_from_cache(&catalog, &db, &tbl)?; let update = format!("update {}.{} set c = 'xyz', d = 30 where b1 = 12", db, tbl); let _res = execute_query(ctx.clone(), &update).await?; + ctx.evict_table_from_cache(&catalog, &db, &tbl)?; let query = format!("select * from {}.{} order by id", db, tbl); let expected = vec![ "+----------+----------+----------+----------+----------+----------+----------+", @@ -115,9 +120,11 @@ async fn test_computed_column() -> Result<()> { // delete { + ctx.evict_table_from_cache(&catalog, &db, &tbl)?; let delete = format!("delete from {}.{} where id >= 4", db, tbl); let _res = execute_query(ctx.clone(), &delete).await?; + ctx.evict_table_from_cache(&catalog, &db, &tbl)?; let query = format!("select * from {}.{} order by id", db, tbl); let expected = vec![ "+----------+----------+----------+----------+----------+----------+----------+", @@ -136,9 +143,11 @@ async fn test_computed_column() -> Result<()> { ) .await?; + ctx.evict_table_from_cache(&catalog, &db, &tbl)?; let delete = format!("delete from {}.{} where b1 = 3 or b2 = 9", db, tbl); let _res = execute_query(ctx.clone(), &delete).await?; + ctx.evict_table_from_cache(&catalog, &db, &tbl)?; let query = format!("select * from {}.{} order by id", db, tbl); let expected = vec![ "+----------+----------+----------+----------+----------+----------+----------+", diff --git a/src/query/ee/tests/it/storages/fuse/operations/vacuum.rs b/src/query/ee/tests/it/storages/fuse/operations/vacuum.rs index 64e53f4c05a72..63d0ca938dc84 100644 --- a/src/query/ee/tests/it/storages/fuse/operations/vacuum.rs +++ b/src/query/ee/tests/it/storages/fuse/operations/vacuum.rs @@ -14,488 +14,14 @@ use std::sync::Arc; -use chrono::Duration; -use chrono::Utc; use common_base::base::tokio; use common_catalog::table_context::TableContext; use common_exception::Result; -use common_storages_fuse::FuseTable; use databend_query::test_kits::table_test_fixture::append_sample_data; use databend_query::test_kits::table_test_fixture::check_data_dir; use databend_query::test_kits::table_test_fixture::execute_command; -use databend_query::test_kits::table_test_fixture::execute_query; use databend_query::test_kits::table_test_fixture::TestFixture; -use databend_query::test_kits::utils::generate_orphan_files; -use databend_query::test_kits::utils::generate_snapshot_with_segments; -use databend_query::test_kits::utils::query_count; -use enterprise_query::storages::fuse::do_vacuum; use enterprise_query::storages::fuse::do_vacuum_drop_tables; -use enterprise_query::storages::fuse::operations::vacuum_table::get_snapshot_referenced_files; -use enterprise_query::storages::fuse::operations::vacuum_table::SnapshotReferencedFiles; - -async fn check_files_existence( - fixture: &TestFixture, - exist_files: Vec<&Vec>, - not_exist_files: Vec<&Vec>, -) -> Result<()> { - let table = fixture.latest_default_table().await?; - let fuse_table = FuseTable::try_from_table(table.as_ref())?; - let dal = fuse_table.get_operator_ref(); - for file_vec in ¬_exist_files { - for file in file_vec.iter() { - assert!(!dal.is_exist(file).await?); - } - } - - for file_vec in &exist_files { - for file in file_vec.iter() { - assert!(dal.is_exist(file).await?); - } - } - - Ok(()) -} - -async fn check_dry_run_files(fixture: &TestFixture, files: Vec<&Vec>) -> Result<()> { - let ctx = fixture.ctx(); - let table_ctx: Arc = ctx.clone(); - let table = fixture.latest_default_table().await?; - let fuse_table = FuseTable::try_from_table(table.as_ref())?; - let retention_time = chrono::Utc::now() - chrono::Duration::seconds(2); - let files_opt = do_vacuum(fuse_table, table_ctx, retention_time, true).await?; - - assert!(files_opt.is_some()); - let purge_files = files_opt.unwrap(); - for file_vec in &files { - for file in file_vec.iter() { - assert!(purge_files.contains(file)); - } - } - let mut count = 0; - files.iter().for_each(|f| count += f.len()); - assert_eq!(purge_files.len(), count); - Ok(()) -} - -// return all the snapshot files and referenced files. -async fn get_snapshots_and_referenced_files( - fixture: &TestFixture, -) -> Result<(SnapshotReferencedFiles, Vec)> { - let ctx = fixture.ctx(); - let table_ctx: Arc = ctx.clone(); - let table = fixture.latest_default_table().await?; - let fuse_table = FuseTable::try_from_table(table.as_ref())?; - let referenced_files = get_snapshot_referenced_files(fuse_table, &table_ctx) - .await? - .unwrap(); - let snapshot_files = fuse_table.list_snapshot_files().await?; - - Ok((referenced_files, snapshot_files)) -} - -// return generate orphan files and snapshot file(optional). -async fn generate_test_orphan_files( - fixture: &TestFixture, - genetate_snapshot: bool, - orphan_segment_file_num: usize, - orphan_block_per_segment_num: usize, -) -> Result<(Vec, Option)> { - let table = fixture.latest_default_table().await?; - let fuse_table = FuseTable::try_from_table(table.as_ref())?; - let files = generate_orphan_files( - fuse_table, - orphan_segment_file_num, - orphan_block_per_segment_num, - ) - .await?; - - let snapshot_opt = if genetate_snapshot { - let mut segments = Vec::with_capacity(files.len()); - files.iter().for_each(|(location, _)| { - segments.push(location.clone()); - }); - - let new_timestamp = Utc::now() - Duration::minutes(1); - let snapshot_location = - generate_snapshot_with_segments(fuse_table, segments, Some(new_timestamp)).await?; - - Some(snapshot_location) - } else { - None - }; - - let mut orphan_files = vec![]; - for (location, segment) in files { - orphan_files.push(location.0); - for block_meta in segment.blocks { - orphan_files.push(block_meta.location.0.clone()); - if let Some(block_index) = &block_meta.bloom_filter_index_location { - orphan_files.push(block_index.0.clone()); - } - } - } - - Ok((orphan_files, snapshot_opt)) -} - -#[tokio::test(flavor = "multi_thread")] -async fn test_fuse_do_vacuum() -> Result<()> { - // verifies that: - // - // - purge orphan {segments|blocks|blocks index} files, but that within retention period shall not be collected - // - // for example : - // - // ──┬── S_old - // │ - // ──┬── S_orphan - // │ - // ──┬── S_current - // │ - // within retention orphan files - // │ - // │ within_retention_time_orphan_files - // ──┴── - // outof retention orphan files - // │ outof_retention_time_orphan_files - // - // - S_old is the old gc root - // all the {segments|blocks|blocks index} referenced by S_old MUST NOT be collected - // - // - S_orphan is the orphan snapshot - // all the {segments|blocks|blocks index} referenced by S_orphan MUST NOT be collected - // - // - S_current is the gc root - // all the {segments|blocks|blocks index} referenced by S_current MUST NOT be collected - // - // - within_retention_time_orphan_files shall NOT be purged - // since they are within retention time. - // - outof_retention_time_orphan_files should be purged - // since it is outof the retention period - // - // put them together, after GC, there will be - // - 2 snapshots left: s_current, s_old - // - 3 segments left: two referenced by S_current and s_old, one in within_retention_time_orphan_files - // - 2 blocks left: two referenced by S_current and old, one in within_retention_time_orphan_files - // - // this test case test that: - // - all the files referenced by snapshot(no matter if or not is an orphan snapshot), - // if or not within retention time, will not be purged; - // - all the orphan files that not referenced by any snapshots: - // - if within retention, will not be purged - // - if outof retention, will be purged - - let fixture = TestFixture::new().await; - let ctx = fixture.ctx(); - let table_ctx: Arc = ctx.clone(); - table_ctx.get_settings().set_retention_period(0)?; - fixture.create_default_table().await?; - - let count_qry = format!( - "select count(*) from {}.{}", - fixture.default_db_name(), - fixture.default_table_name() - ); - assert_eq!(0, { - let stream = execute_query(fixture.ctx(), &count_qry).await?; - query_count(stream).await? as usize - }); - - let orphan_segment_file_num = 1; - let orphan_block_per_segment_num = 1; - - // generate some orphan files out of retention time - let (outof_retention_time_orphan_files, _) = generate_test_orphan_files( - &fixture, - false, - orphan_segment_file_num, - orphan_block_per_segment_num, - ) - .await?; - - let number_of_block = 1; - append_sample_data(number_of_block, &fixture).await?; - - let first_append_data_count = { - let stream = execute_query(fixture.ctx(), &count_qry).await?; - query_count(stream).await? as usize - }; - assert!(first_append_data_count > 0); - - // generate orphan snapshot and segments - let (mut orphan_snapshot_orphan_files, exist_orphan_snapshot_file) = - generate_test_orphan_files( - &fixture, - true, - orphan_segment_file_num, - orphan_block_per_segment_num, - ) - .await?; - orphan_snapshot_orphan_files.push(exist_orphan_snapshot_file.clone().unwrap()); - - tokio::time::sleep(std::time::Duration::from_secs(3)).await; - - // generate some orphan files within retention time - let (within_retention_time_orphan_files, _) = generate_test_orphan_files( - &fixture, - false, - orphan_segment_file_num, - orphan_block_per_segment_num, - ) - .await?; - - // after generate orphan files and insert data, verify the files number - { - let expected_num_of_snapshot = 2; - let expected_num_of_segment = 1 + (orphan_segment_file_num * 3) as u32; - let expected_num_of_blocks = - 1 + (orphan_segment_file_num * orphan_block_per_segment_num * 3) as u32; - let expected_num_of_index = expected_num_of_blocks; - check_data_dir( - &fixture, - "do_gc_orphan_files: verify generate retention files", - expected_num_of_snapshot, - 0, - expected_num_of_segment, - expected_num_of_blocks, - expected_num_of_index, - None, - None, - ) - .await?; - } - - // insert data to make some new snapshot files. - append_sample_data(number_of_block, &fixture).await?; - - // after generate orphan files\insert data\analyze table, verify the files number - { - let expected_num_of_snapshot = 3; - let expected_num_of_segment = 2 + (orphan_segment_file_num * 3) as u32; - let expected_num_of_blocks = - 2 + (orphan_segment_file_num * orphan_block_per_segment_num * 3) as u32; - let expected_num_of_index = expected_num_of_blocks; - let expected_num_of_ts = 0; - check_data_dir( - &fixture, - "do_gc_orphan_files: verify generate retention files and referenced files", - expected_num_of_snapshot, - expected_num_of_ts, - expected_num_of_segment, - expected_num_of_blocks, - expected_num_of_index, - Some(()), - None, - ) - .await?; - } - - let data_count = { - let stream = execute_query(fixture.ctx(), &count_qry).await?; - query_count(stream).await? as usize - }; - assert_eq!(data_count, first_append_data_count * 2); - - // do dry_run gc - let purged_files = vec![ - &outof_retention_time_orphan_files, - &orphan_snapshot_orphan_files, - ]; - check_dry_run_files(&fixture, purged_files.clone()).await?; - - // check that after dry_run gc, files number has not changed - { - let expected_num_of_snapshot = 3; - let expected_num_of_segment = 2 + (orphan_segment_file_num * 3) as u32; - let expected_num_of_blocks = - 2 + (orphan_segment_file_num * orphan_block_per_segment_num * 3) as u32; - let expected_num_of_index = expected_num_of_blocks; - let expected_num_of_ts = 0; - check_data_dir( - &fixture, - "do_gc_orphan_files: verify generate retention files and referenced files", - expected_num_of_snapshot, - expected_num_of_ts, - expected_num_of_segment, - expected_num_of_blocks, - expected_num_of_index, - Some(()), - None, - ) - .await?; - } - - // first set retention_period as a bigger value cause `do_vacuum` does not vacuum files - table_ctx.get_settings().set_retention_period(1024)?; - - // do gc. - { - let table_ctx: Arc = ctx.clone(); - let table = fixture.latest_default_table().await?; - let fuse_table = FuseTable::try_from_table(table.as_ref())?; - let retention_time = chrono::Utc::now() - chrono::Duration::seconds(2); - do_vacuum(fuse_table, table_ctx, retention_time, false).await?; - } - - // check that after do_vacuum, files number has not changed - { - let expected_num_of_snapshot = 3; - let expected_num_of_segment = 2 + (orphan_segment_file_num * 3) as u32; - let expected_num_of_blocks = - 2 + (orphan_segment_file_num * orphan_block_per_segment_num * 3) as u32; - let expected_num_of_index = expected_num_of_blocks; - let expected_num_of_ts = 0; - check_data_dir( - &fixture, - "do_gc_orphan_files: verify generate retention files and referenced files", - expected_num_of_snapshot, - expected_num_of_ts, - expected_num_of_segment, - expected_num_of_blocks, - expected_num_of_index, - Some(()), - None, - ) - .await?; - } - - // set retention_period as a smaller value cause `do_vacuum` gc files - table_ctx.get_settings().set_retention_period(0)?; - - // do gc. - { - let table_ctx: Arc = ctx.clone(); - let table = fixture.latest_default_table().await?; - let fuse_table = FuseTable::try_from_table(table.as_ref())?; - let retention_time = chrono::Utc::now() - chrono::Duration::seconds(2); - do_vacuum(fuse_table, table_ctx, retention_time, false).await?; - } - - assert_eq!(data_count, { - let stream = execute_query(fixture.ctx(), &count_qry).await?; - query_count(stream).await? as usize - }); - - // check files number - { - let expected_num_of_snapshot = 2; - let expected_num_of_segment = 2 + orphan_segment_file_num as u32; - let expected_num_of_blocks = - 2 + (orphan_segment_file_num * orphan_block_per_segment_num) as u32; - let expected_num_of_index = expected_num_of_blocks; - let expected_num_of_ts = 0; - check_data_dir( - &fixture, - "do_gc_orphan_files: after gc", - expected_num_of_snapshot, - expected_num_of_ts, - expected_num_of_segment, - expected_num_of_blocks, - expected_num_of_index, - Some(()), - None, - ) - .await?; - } - - check_files_existence( - &fixture, - vec![&within_retention_time_orphan_files], - purged_files.clone(), - ) - .await?; - - // save all the referenced files - let (old_referenced_files, old_snapshot_files) = - get_snapshots_and_referenced_files(&fixture).await?; - - // sleep more time to make `within_retention_time_orphan_files` out of retention time - tokio::time::sleep(std::time::Duration::from_secs(2)).await; - - // after sleep, check that if `within_retention_time_orphan_files` has been purged. - - // do dry_run gc - check_dry_run_files(&fixture, vec![&within_retention_time_orphan_files]).await?; - - // check that after dry_run gc, files number has not changed - { - let expected_num_of_snapshot = 2; - let expected_num_of_segment = 2 + orphan_segment_file_num as u32; - let expected_num_of_blocks = - 2 + (orphan_segment_file_num * orphan_block_per_segment_num) as u32; - let expected_num_of_index = expected_num_of_blocks; - let expected_num_of_ts = 0; - check_data_dir( - &fixture, - "do_gc_orphan_files: after gc", - expected_num_of_snapshot, - expected_num_of_ts, - expected_num_of_segment, - expected_num_of_blocks, - expected_num_of_index, - Some(()), - None, - ) - .await?; - } - - // do gc. - { - let table_ctx: Arc = ctx.clone(); - let table = fixture.latest_default_table().await?; - let fuse_table = FuseTable::try_from_table(table.as_ref())?; - let retention_time = chrono::Utc::now() - chrono::Duration::seconds(2); - do_vacuum(fuse_table, table_ctx, retention_time, false).await?; - } - - // check files number - { - let expected_num_of_snapshot = 2; - let expected_num_of_segment = 2 + (orphan_segment_file_num - 1) as u32; - let expected_num_of_blocks = - 2 + ((orphan_segment_file_num - 1) * orphan_block_per_segment_num) as u32; - let expected_num_of_index = expected_num_of_blocks; - let expected_num_of_ts = 0; - check_data_dir( - &fixture, - "do_gc_orphan_files: after gc", - expected_num_of_snapshot, - expected_num_of_ts, - expected_num_of_segment, - expected_num_of_blocks, - expected_num_of_index, - Some(()), - None, - ) - .await?; - } - - check_files_existence(&fixture, vec![], vec![&within_retention_time_orphan_files]).await?; - - // check all referenced files and snapshot files exist - { - let (referenced_files, snapshot_files) = - get_snapshots_and_referenced_files(&fixture).await?; - - assert_eq!(referenced_files, old_referenced_files); - assert_eq!(old_snapshot_files, snapshot_files); - - check_files_existence( - &fixture, - vec![&referenced_files.all_files(), &snapshot_files], - vec![&within_retention_time_orphan_files], - ) - .await?; - } - - // check data count matched. - assert_eq!(data_count, { - let stream = execute_query(fixture.ctx(), &count_qry).await?; - query_count(stream).await? as usize - }); - - Ok(()) -} #[tokio::test(flavor = "multi_thread")] async fn test_fuse_do_vacuum_drop_table() -> Result<()> { diff --git a/src/query/service/tests/it/sql/exec/get_table_bind_test.rs b/src/query/service/tests/it/sql/exec/get_table_bind_test.rs new file mode 100644 index 0000000000000..26e9f6db512b3 --- /dev/null +++ b/src/query/service/tests/it/sql/exec/get_table_bind_test.rs @@ -0,0 +1,737 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::collections::HashMap; +use std::collections::HashSet; +use std::sync::atomic::AtomicUsize; +use std::sync::Arc; + +use common_base::base::tokio; +use common_base::base::Progress; +use common_base::base::ProgressValues; +use common_catalog::catalog::Catalog; +use common_catalog::cluster_info::Cluster; +use common_catalog::database::Database; +use common_catalog::plan::DataSourcePlan; +use common_catalog::plan::PartInfoPtr; +use common_catalog::plan::Partitions; +use common_catalog::query_kind::QueryKind; +use common_catalog::table::Table; +use common_catalog::table_context::MaterializedCtesBlocks; +use common_catalog::table_context::ProcessInfo; +use common_catalog::table_context::StageAttachment; +use common_catalog::table_context::TableContext; +use common_exception::ErrorCode; +use common_exception::Result; +use common_expression::DataBlock; +use common_expression::FunctionContext; +use common_io::prelude::FormatSettings; +use common_meta_app::principal::FileFormatParams; +use common_meta_app::principal::OnErrorMode; +use common_meta_app::principal::RoleInfo; +use common_meta_app::principal::UserInfo; +use common_meta_app::schema::CatalogInfo; +use common_meta_app::schema::CountTablesReply; +use common_meta_app::schema::CountTablesReq; +use common_meta_app::schema::CreateDatabaseReply; +use common_meta_app::schema::CreateDatabaseReq; +use common_meta_app::schema::CreateIndexReply; +use common_meta_app::schema::CreateIndexReq; +use common_meta_app::schema::CreateLockRevReply; +use common_meta_app::schema::CreateLockRevReq; +use common_meta_app::schema::CreateTableReply; +use common_meta_app::schema::CreateTableReq; +use common_meta_app::schema::CreateVirtualColumnReply; +use common_meta_app::schema::CreateVirtualColumnReq; +use common_meta_app::schema::DeleteLockRevReq; +use common_meta_app::schema::DropDatabaseReply; +use common_meta_app::schema::DropDatabaseReq; +use common_meta_app::schema::DropIndexReply; +use common_meta_app::schema::DropIndexReq; +use common_meta_app::schema::DropTableByIdReq; +use common_meta_app::schema::DropTableReply; +use common_meta_app::schema::DropVirtualColumnReply; +use common_meta_app::schema::DropVirtualColumnReq; +use common_meta_app::schema::ExtendLockRevReq; +use common_meta_app::schema::GetIndexReply; +use common_meta_app::schema::GetIndexReq; +use common_meta_app::schema::GetTableCopiedFileReply; +use common_meta_app::schema::GetTableCopiedFileReq; +use common_meta_app::schema::IndexMeta; +use common_meta_app::schema::ListIndexesByIdReq; +use common_meta_app::schema::ListIndexesReq; +use common_meta_app::schema::ListLockRevReq; +use common_meta_app::schema::ListVirtualColumnsReq; +use common_meta_app::schema::LockMeta; +use common_meta_app::schema::RenameDatabaseReply; +use common_meta_app::schema::RenameDatabaseReq; +use common_meta_app::schema::RenameTableReply; +use common_meta_app::schema::RenameTableReq; +use common_meta_app::schema::SetTableColumnMaskPolicyReply; +use common_meta_app::schema::SetTableColumnMaskPolicyReq; +use common_meta_app::schema::TableIdent; +use common_meta_app::schema::TableInfo; +use common_meta_app::schema::TableMeta; +use common_meta_app::schema::TruncateTableReply; +use common_meta_app::schema::TruncateTableReq; +use common_meta_app::schema::UndropDatabaseReply; +use common_meta_app::schema::UndropDatabaseReq; +use common_meta_app::schema::UndropTableReply; +use common_meta_app::schema::UndropTableReq; +use common_meta_app::schema::UpdateIndexReply; +use common_meta_app::schema::UpdateIndexReq; +use common_meta_app::schema::UpdateTableMetaReply; +use common_meta_app::schema::UpdateTableMetaReq; +use common_meta_app::schema::UpdateVirtualColumnReply; +use common_meta_app::schema::UpdateVirtualColumnReq; +use common_meta_app::schema::UpsertTableOptionReply; +use common_meta_app::schema::UpsertTableOptionReq; +use common_meta_app::schema::VirtualColumnMeta; +use common_meta_types::MetaId; +use common_pipeline_core::InputError; +use common_settings::ChangeValue; +use common_settings::Settings; +use common_sql::Planner; +use common_storage::CopyStatus; +use common_storage::DataOperator; +use common_storage::FileStatus; +use common_storage::StageFileInfo; +use common_users::GrantObjectVisibilityChecker; +use dashmap::DashMap; +use databend_query::sessions::QueryContext; +use databend_query::test_kits::table_test_fixture::TestFixture; +use parking_lot::Mutex; +use parking_lot::RwLock; +use storages_common_table_meta::meta::Location; + +type MetaType = (String, String, String); + +#[derive(Clone, Debug)] +struct FakedCatalog { + cat: Arc, + error_injection: Option, +} + +#[async_trait::async_trait] +impl Catalog for FakedCatalog { + fn name(&self) -> String { + "FakedCatalog".to_string() + } + + fn info(&self) -> CatalogInfo { + self.cat.info() + } + + async fn get_database(&self, _tenant: &str, _db_name: &str) -> Result> { + todo!() + } + + async fn list_databases(&self, _tenant: &str) -> Result>> { + todo!() + } + + async fn create_database(&self, _req: CreateDatabaseReq) -> Result { + todo!() + } + + async fn drop_database(&self, _req: DropDatabaseReq) -> Result { + todo!() + } + + async fn undrop_database(&self, _req: UndropDatabaseReq) -> Result { + todo!() + } + + async fn rename_database(&self, _req: RenameDatabaseReq) -> Result { + todo!() + } + + fn get_table_by_info(&self, table_info: &TableInfo) -> Result> { + self.cat.get_table_by_info(table_info) + } + + async fn get_table_meta_by_id(&self, table_id: MetaId) -> Result<(TableIdent, Arc)> { + self.cat.get_table_meta_by_id(table_id).await + } + + async fn get_table( + &self, + tenant: &str, + db_name: &str, + table_name: &str, + ) -> Result> { + self.cat.get_table(tenant, db_name, table_name).await + } + + async fn list_tables(&self, _tenant: &str, _db_name: &str) -> Result>> { + todo!() + } + + async fn list_tables_history( + &self, + _tenant: &str, + _db_name: &str, + ) -> Result>> { + todo!() + } + + async fn create_table(&self, _req: CreateTableReq) -> Result { + todo!() + } + + async fn drop_table_by_id(&self, _req: DropTableByIdReq) -> Result { + todo!() + } + + async fn undrop_table(&self, _req: UndropTableReq) -> Result { + todo!() + } + + async fn rename_table(&self, _req: RenameTableReq) -> Result { + todo!() + } + + async fn upsert_table_option( + &self, + _tenant: &str, + _db_name: &str, + _req: UpsertTableOptionReq, + ) -> Result { + todo!() + } + + async fn update_table_meta( + &self, + table_info: &TableInfo, + req: UpdateTableMetaReq, + ) -> Result { + if let Some(e) = &self.error_injection { + Err(e.clone()) + } else { + self.cat.update_table_meta(table_info, req).await + } + } + + async fn set_table_column_mask_policy( + &self, + _req: SetTableColumnMaskPolicyReq, + ) -> Result { + todo!() + } + + async fn count_tables(&self, _req: CountTablesReq) -> Result { + todo!() + } + + async fn get_table_copied_file_info( + &self, + _tenant: &str, + _db_name: &str, + _req: GetTableCopiedFileReq, + ) -> Result { + todo!() + } + + async fn truncate_table( + &self, + _table_info: &TableInfo, + _req: TruncateTableReq, + ) -> Result { + todo!() + } + + #[async_backtrace::framed] + async fn create_index(&self, _req: CreateIndexReq) -> Result { + unimplemented!() + } + + #[async_backtrace::framed] + async fn drop_index(&self, _req: DropIndexReq) -> Result { + unimplemented!() + } + + #[async_backtrace::framed] + async fn get_index(&self, _req: GetIndexReq) -> Result { + unimplemented!() + } + + #[async_backtrace::framed] + async fn update_index(&self, _req: UpdateIndexReq) -> Result { + unimplemented!() + } + + #[async_backtrace::framed] + async fn list_indexes(&self, _req: ListIndexesReq) -> Result> { + unimplemented!() + } + + #[async_backtrace::framed] + async fn list_index_ids_by_table_id(&self, _req: ListIndexesByIdReq) -> Result> { + unimplemented!() + } + + #[async_backtrace::framed] + async fn list_indexes_by_table_id( + &self, + _req: ListIndexesByIdReq, + ) -> Result> { + unimplemented!() + } + + #[async_backtrace::framed] + async fn create_virtual_column( + &self, + _req: CreateVirtualColumnReq, + ) -> Result { + unimplemented!() + } + + #[async_backtrace::framed] + async fn update_virtual_column( + &self, + _req: UpdateVirtualColumnReq, + ) -> Result { + unimplemented!() + } + + #[async_backtrace::framed] + async fn drop_virtual_column( + &self, + _req: DropVirtualColumnReq, + ) -> Result { + unimplemented!() + } + + #[async_backtrace::framed] + async fn list_virtual_columns( + &self, + _req: ListVirtualColumnsReq, + ) -> Result> { + unimplemented!() + } + + fn as_any(&self) -> &dyn Any { + todo!() + } + + async fn list_lock_revisions(&self, _req: ListLockRevReq) -> Result> { + todo!() + } + + async fn create_lock_revision(&self, _req: CreateLockRevReq) -> Result { + todo!() + } + + async fn extend_lock_revision(&self, _req: ExtendLockRevReq) -> Result<()> { + todo!() + } + + async fn delete_lock_revision(&self, _req: DeleteLockRevReq) -> Result<()> { + todo!() + } +} + +struct CtxDelegation { + ctx: Arc, + cat: FakedCatalog, + cache: Mutex>>, + table_from_cache: AtomicUsize, + table_without_cache: AtomicUsize, +} + +impl CtxDelegation { + fn new(ctx: Arc, faked_cat: FakedCatalog) -> Self { + Self { + ctx, + cat: faked_cat, + cache: Mutex::new(HashMap::new()), + table_from_cache: AtomicUsize::new(0), + table_without_cache: AtomicUsize::new(0), + } + } +} + +#[async_trait::async_trait] +impl TableContext for CtxDelegation { + fn as_any(&self) -> &dyn Any { + self + } + + fn build_table_from_source_plan(&self, _plan: &DataSourcePlan) -> Result> { + todo!() + } + + fn incr_total_scan_value(&self, _value: ProgressValues) { + todo!() + } + + fn get_total_scan_value(&self) -> ProgressValues { + todo!() + } + + fn get_scan_progress(&self) -> Arc { + todo!() + } + + fn get_scan_progress_value(&self) -> ProgressValues { + todo!() + } + + fn get_write_progress(&self) -> Arc { + self.ctx.get_write_progress() + } + + fn get_join_spill_progress(&self) -> Arc { + self.ctx.get_join_spill_progress() + } + + fn get_aggregate_spill_progress(&self) -> Arc { + self.ctx.get_aggregate_spill_progress() + } + + fn get_group_by_spill_progress(&self) -> Arc { + self.ctx.get_group_by_spill_progress() + } + + fn get_write_progress_value(&self) -> ProgressValues { + todo!() + } + + fn get_join_spill_progress_value(&self) -> ProgressValues { + todo!() + } + + fn get_group_by_spill_progress_value(&self) -> ProgressValues { + todo!() + } + + fn get_aggregate_spill_progress_value(&self) -> ProgressValues { + todo!() + } + + fn get_result_progress(&self) -> Arc { + todo!() + } + + fn get_result_progress_value(&self) -> ProgressValues { + todo!() + } + + fn get_status_info(&self) -> String { + "".to_string() + } + + fn set_status_info(&self, _info: &str) {} + + fn get_partition(&self) -> Option { + todo!() + } + + fn get_partitions(&self, _: usize) -> Vec { + todo!() + } + + fn set_partitions(&self, _partitions: Partitions) -> Result<()> { + todo!() + } + + fn add_partitions_sha(&self, _sha: String) { + todo!() + } + + fn get_partitions_shas(&self) -> Vec { + todo!() + } + + fn get_cacheable(&self) -> bool { + todo!() + } + + fn set_cacheable(&self, _: bool) { + todo!() + } + + fn get_can_scan_from_agg_index(&self) -> bool { + self.ctx.get_can_scan_from_agg_index() + } + fn set_can_scan_from_agg_index(&self, _: bool) { + todo!() + } + + fn attach_query_str(&self, _kind: QueryKind, _query: String) { + todo!() + } + + fn get_query_str(&self) -> String { + todo!() + } + + fn get_fragment_id(&self) -> usize { + todo!() + } + + async fn get_catalog(&self, catalog_name: &str) -> Result> { + self.ctx.get_catalog(catalog_name).await + } + + fn get_default_catalog(&self) -> Result> { + self.ctx.get_default_catalog() + } + + fn get_id(&self) -> String { + self.ctx.get_id() + } + + fn get_current_catalog(&self) -> String { + "default".to_owned() + } + + fn check_aborting(&self) -> Result<()> { + todo!() + } + + fn get_error(&self) -> Option { + todo!() + } + + fn get_current_database(&self) -> String { + self.ctx.get_current_database() + } + + fn get_current_user(&self) -> Result { + todo!() + } + + fn get_current_role(&self) -> Option { + todo!() + } + async fn get_available_roles(&self) -> Result> { + todo!() + } + + async fn get_visibility_checker(&self) -> Result { + todo!() + } + + fn get_fuse_version(&self) -> String { + todo!() + } + + fn get_format_settings(&self) -> Result { + todo!() + } + + fn get_tenant(&self) -> String { + self.ctx.get_tenant() + } + + fn get_query_kind(&self) -> QueryKind { + todo!() + } + + fn get_function_context(&self) -> Result { + self.ctx.get_function_context() + } + + fn get_connection_id(&self) -> String { + todo!() + } + + fn get_settings(&self) -> Arc { + Settings::create("fake_settings".to_string()) + } + + fn get_shared_settings(&self) -> Arc { + todo!() + } + + fn get_cluster(&self) -> Arc { + self.ctx.get_cluster() + } + + fn get_processes_info(&self) -> Vec { + todo!() + } + + fn get_stage_attachment(&self) -> Option { + todo!() + } + + fn get_last_query_id(&self, _index: i32) -> String { + todo!() + } + fn get_query_id_history(&self) -> HashSet { + todo!() + } + fn get_result_cache_key(&self, _query_id: &str) -> Option { + todo!() + } + fn set_query_id_result_cache(&self, _query_id: String, _result_cache_key: String) { + todo!() + } + + fn get_on_error_map(&self) -> Option>>> { + todo!() + } + fn set_on_error_map(&self, _map: Arc>>) { + todo!() + } + fn get_on_error_mode(&self) -> Option { + todo!() + } + fn set_on_error_mode(&self, _mode: OnErrorMode) { + todo!() + } + fn get_maximum_error_per_file(&self) -> Option> { + todo!() + } + + fn apply_changed_settings(&self, _changes: HashMap) -> Result<()> { + todo!() + } + + fn get_changed_settings(&self) -> HashMap { + todo!() + } + + fn get_data_operator(&self) -> Result { + self.ctx.get_data_operator() + } + + async fn get_file_format(&self, _name: &str) -> Result { + todo!() + } + + async fn get_table( + &self, + _catalog: &str, + database: &str, + table: &str, + ) -> Result> { + let tenant = self.ctx.get_tenant(); + let db = database.to_string(); + let tbl = table.to_string(); + let table_meta_key = (tenant, db, tbl); + let already_in_cache = { self.cache.lock().contains_key(&table_meta_key) }; + if already_in_cache { + self.table_from_cache + .fetch_add(1, std::sync::atomic::Ordering::SeqCst); + Ok(self + .cache + .lock() + .get(&table_meta_key) + .ok_or_else(|| ErrorCode::Internal("Logical error, it's a bug."))? + .clone()) + } else { + self.table_without_cache + .fetch_add(1, std::sync::atomic::Ordering::SeqCst); + let tbl = self + .cat + .get_table(self.ctx.get_tenant().as_str(), database, table) + .await?; + let tbl2 = tbl.clone(); + let mut guard = self.cache.lock(); + guard.insert(table_meta_key, tbl); + Ok(tbl2) + } + } + + async fn filter_out_copied_files( + &self, + _catalog_name: &str, + _database_name: &str, + _table_name: &str, + _files: &[StageFileInfo], + _max_files: Option, + ) -> Result> { + todo!() + } + + fn set_materialized_cte( + &self, + _idx: (usize, usize), + _blocks: Arc>>, + ) -> Result<()> { + todo!() + } + + fn get_materialized_cte( + &self, + _idx: (usize, usize), + ) -> Result>>>> { + todo!() + } + + fn get_materialized_ctes(&self) -> MaterializedCtesBlocks { + todo!() + } + + fn add_segment_location(&self, _segment_loc: Location) -> Result<()> { + todo!() + } + + fn get_segment_locations(&self) -> Result> { + todo!() + } + + fn add_file_status(&self, _file_path: &str, _file_status: FileStatus) -> Result<()> { + todo!() + } + + fn get_copy_status(&self) -> Arc { + todo!() + } + + fn get_license_key(&self) -> String { + self.ctx.get_license_key() + } +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_get_same_table_once() -> Result<()> { + let fixture = TestFixture::new().await; + let query = format!( + "select * from {}.{} join {}.{} as t2 join {}.{} as t3", + fixture.default_db_name().as_str(), + fixture.default_table_name().as_str(), + fixture.default_db_name().as_str(), + fixture.default_table_name().as_str(), + fixture.default_db_name().as_str(), + fixture.default_table_name().as_str() + ); + fixture.create_default_table().await?; + let ctx = fixture.ctx(); + let catalog = ctx.get_catalog("default").await?; + let faked_catalog = FakedCatalog { + cat: catalog, + error_injection: None, + }; + + let ctx = Arc::new(CtxDelegation::new(ctx, faked_catalog)); + + let mut planner = Planner::new(ctx.clone()); + let (_, _) = planner.plan_sql(query.as_str()).await?; + assert_eq!( + ctx.table_without_cache + .load(std::sync::atomic::Ordering::SeqCst), + 1 + ); + assert_eq!( + ctx.table_from_cache + .load(std::sync::atomic::Ordering::SeqCst), + 2 + ); + Ok(()) +} diff --git a/src/query/service/tests/it/sql/exec/mod.rs b/src/query/service/tests/it/sql/exec/mod.rs index 29c503336dc61..db4e7d87d5273 100644 --- a/src/query/service/tests/it/sql/exec/mod.rs +++ b/src/query/service/tests/it/sql/exec/mod.rs @@ -12,6 +12,20 @@ // See the License for the specific language governing permissions and // limitations under the License. +use common_base::base::tokio; +use common_base::runtime::Runtime; +use common_base::runtime::TrySpawn; +use common_base::GLOBAL_TASK; +use common_exception::ErrorCode; +use common_exception::Result; +use common_sql::plans::Plan; +use common_sql::Planner; +use common_storages_fuse::FuseTable; +use databend_query::interpreters::Interpreter; +use databend_query::interpreters::OptimizeTableInterpreter; +use databend_query::test_kits::TestFixture; +use futures_util::TryStreamExt; + #[test] pub fn test_format_field_name() { use databend_query::sql::executor::decode_field_name; @@ -22,3 +36,127 @@ pub fn test_format_field_name() { let (decoded_name, decoded_index) = decode_field_name(field_name.as_str()).unwrap(); assert!(decoded_name == display_name && decoded_index == index); } + +#[tokio::test(flavor = "multi_thread")] +pub async fn test_snapshot_consistency() -> Result<()> { + let fixture = TestFixture::new().await; + let ctx = fixture.ctx(); + let tbl = fixture.default_table_name(); + let db = fixture.default_db_name(); + fixture.create_default_table().await?; + + let db2 = db.clone(); + let tbl2 = tbl.clone(); + + let runtime = Runtime::with_default_worker_threads()?; + + // 1. insert into tbl + let mut planner = Planner::new(ctx.clone()); + let mut planner2 = Planner::new(ctx.clone()); + // generate 3 segments + // insert 3 times + let n = 3; + for _ in 0..n { + let table = fixture.latest_default_table().await?; + let num_blocks = 1; + let stream = TestFixture::gen_sample_blocks_stream(num_blocks, 1); + + let blocks = stream.try_collect().await?; + fixture + .append_commit_blocks(table.clone(), blocks, false, true) + .await?; + } + + let query_task = async move { + // 2. test compact and select concurrency + let query = format!( + "select * from {}.{} join (select id,t from {}.{} as t2 where id > 1 and id < 100000)", + db, tbl, db, tbl + ); + + // a. thread 1: read table + let (query_plan, _) = planner.plan_sql(&query).await?; + + if let Plan::Query { + s_expr: _s_expr, + metadata, + bind_context: _bind_context, + rewrite_kind: _rewrite_kind, + formatted_ast: _formatted_ast, + ignore_result: _ignore_result, + } = query_plan + { + let tbl_entries = { + let meta = metadata.read(); + meta.tables().to_vec() + }; + let mut tables = Vec::with_capacity(2); + for entry in tbl_entries { + if entry.name() == tbl { + tables.push(entry.table()); + } + } + assert_eq!(tables.len(), 2); + let table0 = tables[0].clone(); + let table1 = tables[1].clone(); + + let fuse_table0 = table0 + .as_any() + .downcast_ref::() + .ok_or(ErrorCode::Unimplemented(format!( + "table {}, engine type {}, does not support", + table0.name(), + table0.get_table_info().engine(), + ))) + .unwrap(); + let snapshot0 = fuse_table0.read_table_snapshot().await?; + + let fuse_table1 = table1 + .as_any() + .downcast_ref::() + .ok_or(ErrorCode::Unimplemented(format!( + "table {}, engine type {}, does not support", + table1.name(), + table1.get_table_info().engine(), + ))) + .unwrap(); + let snapshot1 = fuse_table1.read_table_snapshot().await?; + + let res = match (snapshot0, snapshot1) { + (None, None) => true, + (None, Some(_)) => false, + (Some(_), None) => false, + (Some(a), Some(b)) => a.segments == b.segments, + }; + if !res { + return Err(ErrorCode::BadArguments("snapshot consistency failed")); + } + } else { + return Err(ErrorCode::BadArguments("query bad plan")); + } + Ok::<(), ErrorCode>(()) + }; + + let query_handler = runtime.spawn(GLOBAL_TASK, query_task); + + let compact_task = async move { + let compact_sql = format!("optimize table {}.{} compact", db2, tbl2); + let (compact_plan, _) = planner2.plan_sql(&compact_sql).await?; + if let Plan::OptimizeTable(plan) = compact_plan { + let optimize_interpreter = + OptimizeTableInterpreter::try_create(ctx.clone(), *plan.clone())?; + optimize_interpreter.execute(ctx).await?; + } + Ok::<(), ErrorCode>(()) + }; + + // b. thread2: optmize table + let compact_handler = runtime.spawn(GLOBAL_TASK, compact_task); + + query_handler.await.unwrap()?; + compact_handler.await.unwrap()?; + + Ok(()) +} + +mod get_table_bind_test; diff --git a/src/query/service/tests/it/storages/fuse/operations/internal_column.rs b/src/query/service/tests/it/storages/fuse/operations/internal_column.rs index 256ac728775bc..53f9d23a24045 100644 --- a/src/query/service/tests/it/storages/fuse/operations/internal_column.rs +++ b/src/query/service/tests/it/storages/fuse/operations/internal_column.rs @@ -140,6 +140,7 @@ async fn check_partitions(parts: &Partitions, fixture: &TestFixture) -> Result<( #[tokio::test(flavor = "multi_thread")] async fn test_internal_column() -> Result<()> { let fixture = TestFixture::new().await; + let catalog = fixture.default_catalog_name(); let db = fixture.default_db_name(); let tbl = fixture.default_table_name(); let ctx = fixture.ctx(); @@ -187,6 +188,7 @@ async fn test_internal_column() -> Result<()> { check_data_block(expected, blocks)?; // do compact + ctx.evict_table_from_cache(&catalog, &db, &tbl)?; let query = format!("optimize table {db}.{tbl} compact"); let mut planner = Planner::new(ctx.clone()); let (plan, _) = planner.plan_sql(&query).await?; @@ -194,6 +196,7 @@ async fn test_internal_column() -> Result<()> { let data_stream = interpreter.execute(ctx.clone()).await?; let _ = data_stream.try_collect::>().await; + ctx.evict_table_from_cache(&catalog, &db, &tbl)?; let query = format!( "select _row_id,_snapshot_name,_segment_name,_block_name from {}.{} order by _row_id", db, tbl diff --git a/src/query/service/tests/it/storages/fuse/operations/mutation/segments_compact_mutator.rs b/src/query/service/tests/it/storages/fuse/operations/mutation/segments_compact_mutator.rs index b024dbe6d42f9..b81812f5bd120 100644 --- a/src/query/service/tests/it/storages/fuse/operations/mutation/segments_compact_mutator.rs +++ b/src/query/service/tests/it/storages/fuse/operations/mutation/segments_compact_mutator.rs @@ -82,6 +82,7 @@ async fn test_compact_segment_normal_case() -> Result<()> { append_rows(ctx.clone(), num_inserts).await?; // check count + ctx.evict_table_from_cache("default", "default", "t")?; let count_qry = "select count(*) from t"; let stream = execute_query(fixture.ctx(), count_qry).await?; assert_eq!(9, check_count(stream).await?); @@ -97,12 +98,14 @@ async fn test_compact_segment_normal_case() -> Result<()> { mutator.try_commit(table.clone()).await?; // check segment count + ctx.evict_table_from_cache("default", "default", "t")?; let qry = "select segment_count as count from fuse_snapshot('default', 't') limit 1"; let stream = execute_query(fixture.ctx(), qry).await?; // after compact, in our case, there should be only 1 segment left assert_eq!(1, check_count(stream).await?); // check block count + ctx.evict_table_from_cache("default", "default", "t")?; let qry = "select block_count as count from fuse_snapshot('default', 't') limit 1"; let stream = execute_query(fixture.ctx(), qry).await?; assert_eq!(num_inserts as u64, check_count(stream).await?); @@ -124,11 +127,13 @@ async fn test_compact_segment_resolvable_conflict() -> Result<()> { append_rows(ctx.clone(), num_inserts).await?; // check count + ctx.evict_table_from_cache("default", "default", "t")?; let count_qry = "select count(*) from t"; let stream = execute_query(fixture.ctx(), count_qry).await?; assert_eq!(9, check_count(stream).await?); // compact segment + ctx.evict_table_from_cache("default", "default", "t")?; let table = catalog .get_table(ctx.get_tenant().as_str(), "default", "t") .await?; @@ -144,6 +149,7 @@ async fn test_compact_segment_resolvable_conflict() -> Result<()> { mutator.try_commit(table.clone()).await?; // check segment count + ctx.evict_table_from_cache("default", "default", "t")?; let count_seg = "select segment_count as count from fuse_snapshot('default', 't') limit 1"; let stream = execute_query(fixture.ctx(), count_seg).await?; // after compact, in our case, there should be only 1 + num_inserts segments left @@ -151,6 +157,7 @@ async fn test_compact_segment_resolvable_conflict() -> Result<()> { assert_eq!(1 + num_inserts as u64, check_count(stream).await?); // check block count + ctx.evict_table_from_cache("default", "default", "t")?; let count_block = "select block_count as count from fuse_snapshot('default', 't') limit 1"; let stream = execute_query(fixture.ctx(), count_block).await?; assert_eq!(num_inserts as u64 * 2, check_count(stream).await?); @@ -181,11 +188,13 @@ async fn test_compact_segment_unresolvable_conflict() -> Result<()> { append_rows(ctx.clone(), num_inserts).await?; // check count + ctx.evict_table_from_cache("default", "default", "t")?; let count_qry = "select count(*) from t"; let stream = execute_query(fixture.ctx(), count_qry).await?; assert_eq!(num_inserts as u64, check_count(stream).await?); // try compact segment + ctx.evict_table_from_cache("default", "default", "t")?; let table = catalog .get_table(ctx.get_tenant().as_str(), "default", "t") .await?; diff --git a/src/query/service/tests/it/storages/fuse/operations/table_analyze.rs b/src/query/service/tests/it/storages/fuse/operations/table_analyze.rs index 8fb4b62e37e3c..d3e300fe3bdc3 100644 --- a/src/query/service/tests/it/storages/fuse/operations/table_analyze.rs +++ b/src/query/service/tests/it/storages/fuse/operations/table_analyze.rs @@ -44,6 +44,7 @@ async fn test_table_modify_column_ndv_statistics() -> Result<()> { let ctx = fixture.ctx(); // setup + ctx.evict_table_from_cache("default", "default", "t")?; let create_tbl_command = "create table t(c int)"; execute_command(ctx.clone(), create_tbl_command).await?; @@ -51,6 +52,7 @@ async fn test_table_modify_column_ndv_statistics() -> Result<()> { let num_inserts = 3; append_rows(ctx.clone(), num_inserts).await?; + ctx.evict_table_from_cache("default", "default", "t")?; let statistics_sql = "analyze table default.t"; execute_command(ctx.clone(), statistics_sql).await?; @@ -59,6 +61,7 @@ async fn test_table_modify_column_ndv_statistics() -> Result<()> { .await?; // check count + ctx.evict_table_from_cache("default", "default", "t")?; let count_qry = "select count(*) from t"; let stream = execute_query(fixture.ctx(), count_qry).await?; assert_eq!(num_inserts, query_count(stream).await? as usize); @@ -68,9 +71,11 @@ async fn test_table_modify_column_ndv_statistics() -> Result<()> { // append the same values again, and ndv does changed. append_rows(ctx.clone(), num_inserts).await?; + ctx.evict_table_from_cache("default", "default", "t")?; execute_command(ctx.clone(), statistics_sql).await?; // check count + ctx.evict_table_from_cache("default", "default", "t")?; let count_qry = "select count(*) from t"; let stream = execute_query(fixture.ctx(), count_qry).await?; assert_eq!(num_inserts * 2, query_count(stream).await? as usize); @@ -78,12 +83,14 @@ async fn test_table_modify_column_ndv_statistics() -> Result<()> { check_column_ndv_statistics(table.clone(), expected.clone()).await?; // delete + ctx.evict_table_from_cache("default", "default", "t")?; let query = "delete from default.t where c=1"; let mut planner = Planner::new(ctx.clone()); let (plan, _) = planner.plan_sql(query).await?; if let Plan::Delete(delete) = plan { do_deletion(ctx.clone(), *delete).await?; } + ctx.evict_table_from_cache("default", "default", "t")?; execute_command(ctx.clone(), statistics_sql).await?; // check count: delete not affect counts diff --git a/src/query/sql/src/planner/binder/table.rs b/src/query/sql/src/planner/binder/table.rs index 278b57e438ac6..fbce35dce4b63 100644 --- a/src/query/sql/src/planner/binder/table.rs +++ b/src/query/sql/src/planner/binder/table.rs @@ -1010,17 +1010,22 @@ impl Binder { } #[async_backtrace::framed] - pub(crate) async fn resolve_data_source( + pub async fn resolve_data_source( &self, - tenant: &str, + _tenant: &str, catalog_name: &str, database_name: &str, table_name: &str, travel_point: &Option, ) -> Result> { - // Resolve table with catalog - let catalog = self.catalogs.get_catalog(tenant, catalog_name).await?; - let mut table_meta = catalog.get_table(tenant, database_name, table_name).await?; + // Resolve table with ctx + // for example: select * from t1 join (select * from t1 as t2 where a > 1 and a < 13); + // we will invoke here twice for t1, so in the past, we use catalog every time to get the + // newest snapshot, we can't get consistent snapshot + let mut table_meta = self + .ctx + .get_table(catalog_name, database_name, table_name) + .await?; if let Some(tp) = travel_point { table_meta = table_meta.navigate_to(tp).await?;