Skip to content

Commit

Permalink
fix: fix snapshot consistency bug (#13526)
Browse files Browse the repository at this point in the history
* add test

* fix test

* remove log

* commit data

* add comments

* try fix

* fix check

* clean cache

* remove vaccum test

* fix tests

* remove useless codes

* fix spawn

* remove codes

* fix clippy

* add test

* fix check

* fix type

---------

Co-authored-by: Yang Xiufeng <[email protected]>
  • Loading branch information
JackTan25 and youngsofun authored Nov 3, 2023
1 parent 1ffa701 commit 8683889
Show file tree
Hide file tree
Showing 11 changed files with 1,083 additions and 482 deletions.
81 changes: 80 additions & 1 deletion src/query/ee/tests/it/aggregating_index/index_refresh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ use databend_query::test_kits::TestFixture;
use enterprise_query::test_kits::context::create_ee_query_context;
use futures_util::TryStreamExt;

use crate::aggregating_index::CATALOG;
use crate::aggregating_index::DATABASE;

async fn plan_sql(ctx: Arc<QueryContext>, sql: &str) -> Result<Plan> {
let mut planner = Planner::new(ctx);
let (plan, _) = planner.plan_sql(sql).await?;
Expand Down Expand Up @@ -113,12 +116,19 @@ async fn test_refresh_agg_index() -> Result<()> {
let fixture = TestFixture::new_with_ctx(_guard, ctx).await;

// Create table
fixture
.ctx()
.evict_table_from_cache(CATALOG, DATABASE, "t0")?;
execute_sql(
fixture.ctx(),
"CREATE TABLE t0 (a int, b int, c int) storage_format = 'parquet'",
)
.await?;

fixture
.ctx()
.evict_table_from_cache(CATALOG, DATABASE, "t0")?;

// Insert data
execute_sql(
fixture.ctx(),
Expand All @@ -129,6 +139,10 @@ async fn test_refresh_agg_index() -> Result<()> {
// Create index
let index_name = "index0";

fixture
.ctx()
.evict_table_from_cache(CATALOG, DATABASE, "t0")?;

let index_id = create_index(
fixture.ctx(),
index_name,
Expand All @@ -138,6 +152,9 @@ async fn test_refresh_agg_index() -> Result<()> {
.await?;

// Refresh Index
fixture
.ctx()
.evict_table_from_cache(CATALOG, DATABASE, index_name)?;
refresh_index(fixture.ctx(), index_name, None).await?;

let block_path = find_block_path(&root)?.unwrap();
Expand All @@ -156,13 +173,19 @@ async fn test_refresh_agg_index() -> Result<()> {

// Check aggregating index is correct.
{
fixture
.ctx()
.evict_table_from_cache(CATALOG, DATABASE, "t0")?;
let res = execute_sql(
fixture.ctx(),
"SELECT b, SUM_STATE(a) from t0 WHERE c > 1 GROUP BY b",
)
.await?;
let data_blocks: Vec<DataBlock> = res.try_collect().await?;

fixture
.ctx()
.evict_table_from_cache(CATALOG, DATABASE, "t0")?;
let agg_res = execute_sql(
fixture.ctx(),
&format!(
Expand All @@ -177,6 +200,9 @@ async fn test_refresh_agg_index() -> Result<()> {
}

// Insert more data
fixture
.ctx()
.evict_table_from_cache(CATALOG, DATABASE, "t0")?;
execute_sql(
fixture.ctx(),
"INSERT INTO t0 VALUES (1,1,4), (1,2,1), (1,2,4), (2,2,5)",
Expand All @@ -188,6 +214,9 @@ async fn test_refresh_agg_index() -> Result<()> {
assert!(blocks.len() > indexes.len());

// Refresh Index again
fixture
.ctx()
.evict_table_from_cache(CATALOG, DATABASE, "t0")?;
refresh_index(fixture.ctx(), index_name, None).await?;

// check the new added index is correct.
Expand All @@ -206,6 +235,9 @@ async fn test_refresh_agg_index() -> Result<()> {
indexes[0].clone()
};

fixture
.ctx()
.evict_table_from_cache(CATALOG, DATABASE, "t0")?;
let data_blocks: Vec<DataBlock> = execute_sql(
fixture.ctx(),
&format!(
Expand All @@ -217,6 +249,9 @@ async fn test_refresh_agg_index() -> Result<()> {
.try_collect()
.await?;

fixture
.ctx()
.evict_table_from_cache(CATALOG, DATABASE, "t0")?;
let agg_data_blocks: Vec<DataBlock> = execute_sql(
fixture.ctx(),
&format!(
Expand All @@ -228,6 +263,9 @@ async fn test_refresh_agg_index() -> Result<()> {
.try_collect()
.await?;

fixture
.ctx()
.evict_table_from_cache(CATALOG, DATABASE, "t0")?;
assert_two_blocks_sorted_eq_with_name(
"refresh index again",
&data_blocks,
Expand Down Expand Up @@ -259,7 +297,9 @@ async fn test_refresh_agg_index_with_limit() -> Result<()> {

// Create index
let index_name = "index1";

fixture
.ctx()
.evict_table_from_cache(CATALOG, DATABASE, "t1")?;
let index_id = create_index(
fixture.ctx(),
index_name,
Expand All @@ -269,19 +309,28 @@ async fn test_refresh_agg_index_with_limit() -> Result<()> {
.await?;

// Insert more data
fixture
.ctx()
.evict_table_from_cache(CATALOG, DATABASE, "t1")?;
execute_sql(
fixture.ctx(),
"INSERT INTO t1 VALUES (1,1,4), (1,2,1), (1,2,4), (2,2,5)",
)
.await?;

fixture
.ctx()
.evict_table_from_cache(CATALOG, DATABASE, "t1")?;
execute_sql(
fixture.ctx(),
"INSERT INTO t1 VALUES (1,1,4), (1,2,1), (1,2,4), (2,2,5)",
)
.await?;

// Refresh index with limit 1
fixture
.ctx()
.evict_table_from_cache(CATALOG, DATABASE, "t1")?;
refresh_index(fixture.ctx(), index_name, Some(1)).await?;

let block_path = find_block_path(&root)?.unwrap();
Expand Down Expand Up @@ -339,6 +388,9 @@ async fn test_sync_agg_index_after_update() -> Result<()> {
.await?;

// Insert data
fixture
.ctx()
.evict_table_from_cache(CATALOG, DATABASE, "t0")?;
execute_sql(
ctx.clone(),
"INSERT INTO t0 VALUES (1,1,4), (1,2,1), (1,2,4), (2,2,5)",
Expand All @@ -356,6 +408,9 @@ async fn test_sync_agg_index_after_update() -> Result<()> {

// Check aggregating index_0 is correct.
{
fixture
.ctx()
.evict_table_from_cache(CATALOG, DATABASE, "t0")?;
let res = execute_sql(
ctx.clone(),
"SELECT b, SUM_STATE(a) from t0 WHERE c > 1 GROUP BY b",
Expand All @@ -381,6 +436,9 @@ async fn test_sync_agg_index_after_update() -> Result<()> {
}

// Update
fixture
.ctx()
.evict_table_from_cache(CATALOG, DATABASE, "t0")?;
execute_sql(ctx.clone(), "UPDATE t0 SET c = 2 WHERE b = 2").await?;

let first_block = blocks[0].clone();
Expand All @@ -394,6 +452,9 @@ async fn test_sync_agg_index_after_update() -> Result<()> {

// Check aggregating index_0 is correct after update.
{
fixture
.ctx()
.evict_table_from_cache(CATALOG, DATABASE, "t0")?;
let updated_block = blocks
.iter()
.find(|b| !b.eq_ignore_ascii_case(&first_block))
Expand Down Expand Up @@ -468,6 +529,9 @@ async fn test_sync_agg_index_after_insert() -> Result<()> {
.await?;

// Insert data
fixture
.ctx()
.evict_table_from_cache(CATALOG, DATABASE, "t0")?;
execute_sql(
ctx.clone(),
"INSERT INTO t0 VALUES (1,1,4), (1,2,1), (1,2,4), (2,2,5)",
Expand All @@ -489,6 +553,9 @@ async fn test_sync_agg_index_after_insert() -> Result<()> {

// Check aggregating index_0 is correct.
{
fixture
.ctx()
.evict_table_from_cache(CATALOG, DATABASE, "t0")?;
let res = execute_sql(
ctx.clone(),
"SELECT b, SUM_STATE(a) from t0 WHERE c > 1 GROUP BY b",
Expand All @@ -515,6 +582,9 @@ async fn test_sync_agg_index_after_insert() -> Result<()> {

// Check aggregating index_1 is correct.
{
fixture
.ctx()
.evict_table_from_cache(CATALOG, DATABASE, "t0")?;
let res = execute_sql(
ctx.clone(),
"SELECT a, SUM_STATE(b) from t0 WHERE c > 1 GROUP BY a",
Expand All @@ -540,6 +610,9 @@ async fn test_sync_agg_index_after_insert() -> Result<()> {
}

// Insert more data with insert into ... select ...
fixture
.ctx()
.evict_table_from_cache(CATALOG, DATABASE, "t0")?;
execute_sql(ctx.clone(), "INSERT INTO t0 SELECT * FROM t0").await?;

let blocks = collect_file_names(&block_path)?;
Expand Down Expand Up @@ -581,6 +654,9 @@ async fn test_sync_agg_index_after_copy_into() -> Result<()> {
.await?;

// Copy into data
fixture
.ctx()
.evict_table_from_cache(CATALOG, DATABASE, "t0")?;
execute_sql(
ctx.clone(),
"COPY INTO books FROM 'https://datafuse-1253727613.cos.ap-hongkong.myqcloud.com/data/books.csv' FILE_FORMAT = (TYPE = CSV);",
Expand All @@ -598,6 +674,9 @@ async fn test_sync_agg_index_after_copy_into() -> Result<()> {

// Check aggregating index_0 is correct.
{
fixture
.ctx()
.evict_table_from_cache(CATALOG, DATABASE, "t0")?;
let res = execute_sql(ctx.clone(), "SELECT MAX_STATE(title) from books").await?;
let data_blocks: Vec<DataBlock> = res.try_collect().await?;

Expand Down
Loading

1 comment on commit 8683889

@vercel
Copy link

@vercel vercel bot commented on 8683889 Nov 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.