Skip to content

Commit

Permalink
add some comments
Browse files Browse the repository at this point in the history
  • Loading branch information
ariesdevil committed Dec 20, 2023
1 parent a679cdd commit 8cc7e63
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 33 deletions.
11 changes: 3 additions & 8 deletions src/meta/api/src/schema_api_test_suite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3529,8 +3529,7 @@ impl SchemaApiTestSuite {
dropped_on: None,
updated_on: None,
original_query: "select sum(number) from tb1".to_string(),
query: "select sum(number), tb1._block_name from tb1 group by tb1._block_name"
.to_string(),
query: "select sum(number) from tb1".to_string(),
sync_creation: false,
},
};
Expand Down Expand Up @@ -5543,9 +5542,7 @@ impl SchemaApiTestSuite {
dropped_on: None,
updated_on: None,
original_query: "SELECT a, SUM(b) FROM tb1 WHERE a > 1 GROUP BY b".to_string(),
query:
"SELECT a, SUM(b), tb1._block_name FROM tb1 WHERE a > 1 GROUP BY b, tb1._block_name"
.to_string(),
query: "SELECT a, SUM(b) FROM tb1 WHERE a > 1 GROUP BY b".to_string(),
sync_creation: false,
};

Expand All @@ -5557,9 +5554,7 @@ impl SchemaApiTestSuite {
dropped_on: None,
updated_on: None,
original_query: "SELECT a, SUM(b) FROM tb1 WHERE b > 1 GROUP BY b".to_string(),
query:
"SELECT a, SUM(b), tb1._block_name FROM tb1 WHERE b > 1 GROUP BY b, tb1._block_name"
.to_string(),
query: "SELECT a, SUM(b) FROM tb1 WHERE b > 1 GROUP BY b".to_string(),
sync_creation: false,
};

Expand Down
2 changes: 1 addition & 1 deletion src/meta/proto-conv/tests/it/proto_conv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ fn new_index_meta() -> mt::IndexMeta {
dropped_on: None,
updated_on: None,
original_query: "SELECT a, sum(b) FROM default.t1 WHERE a > 3 GROUP BY b".to_string(),
query: "SELECT a, SUM(b), t1._block_name FROM default.t1 WHERE a > 3 GROUP BY b, t1._block_name".to_string(),
query: "SELECT a, SUM(b) FROM default.t1 WHERE a > 3 GROUP BY b".to_string(),
sync_creation: false,
}
}
Expand Down
16 changes: 7 additions & 9 deletions src/meta/proto-conv/tests/it/v068_index_meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,22 +34,20 @@ use crate::common;
fn test_decode_v68_index() -> anyhow::Result<()> {
let index_v068 = vec![
8, 7, 16, 1, 26, 23, 50, 48, 49, 53, 45, 48, 51, 45, 48, 57, 32, 50, 48, 58, 48, 48, 58,
48, 57, 32, 85, 84, 67, 42, 87, 83, 69, 76, 69, 67, 84, 32, 97, 44, 32, 83, 85, 77, 40, 98,
41, 44, 32, 116, 49, 46, 95, 98, 108, 111, 99, 107, 95, 110, 97, 109, 101, 32, 70, 82, 79,
77, 32, 100, 101, 102, 97, 117, 108, 116, 46, 116, 49, 32, 87, 72, 69, 82, 69, 32, 97, 32,
62, 32, 51, 32, 71, 82, 79, 85, 80, 32, 66, 89, 32, 98, 44, 32, 116, 49, 46, 95, 98, 108,
111, 99, 107, 95, 110, 97, 109, 101, 66, 55, 83, 69, 76, 69, 67, 84, 32, 97, 44, 32, 115,
117, 109, 40, 98, 41, 32, 70, 82, 79, 77, 32, 100, 101, 102, 97, 117, 108, 116, 46, 116,
49, 32, 87, 72, 69, 82, 69, 32, 97, 32, 62, 32, 51, 32, 71, 82, 79, 85, 80, 32, 66, 89, 32,
98, 160, 6, 68, 168, 6, 24,
48, 57, 32, 85, 84, 67, 42, 55, 83, 69, 76, 69, 67, 84, 32, 97, 44, 32, 83, 85, 77, 40, 98,
41, 32, 70, 82, 79, 77, 32, 100, 101, 102, 97, 117, 108, 116, 46, 116, 49, 32, 87, 72, 69,
82, 69, 32, 97, 32, 62, 32, 51, 32, 71, 82, 79, 85, 80, 32, 66, 89, 32, 98, 66, 55, 83, 69,
76, 69, 67, 84, 32, 97, 44, 32, 115, 117, 109, 40, 98, 41, 32, 70, 82, 79, 77, 32, 100,
101, 102, 97, 117, 108, 116, 46, 116, 49, 32, 87, 72, 69, 82, 69, 32, 97, 32, 62, 32, 51,
32, 71, 82, 79, 85, 80, 32, 66, 89, 32, 98, 160, 6, 68, 168, 6, 24,
];

let want = || {
let table_id = 7;
let index_type = IndexType::AGGREGATING;
let created_on = Utc.with_ymd_and_hms(2015, 3, 9, 20, 0, 9).unwrap();
let original_query = "SELECT a, sum(b) FROM default.t1 WHERE a > 3 GROUP BY b".to_string();
let query = "SELECT a, SUM(b), t1._block_name FROM default.t1 WHERE a > 3 GROUP BY b, t1._block_name".to_string();
let query = "SELECT a, SUM(b) FROM default.t1 WHERE a > 3 GROUP BY b".to_string();

IndexMeta {
table_id,
Expand Down
22 changes: 13 additions & 9 deletions src/query/sql/src/planner/binder/ddl/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,15 +170,8 @@ impl Binder {
}
let mut original_query = query.clone();
// pass checker, rewrite aggregate function
// The file name and block only correspond to each other at the time of table_scan,
// after multiple transformations, this correspondence does not exist,
// aggregating index needs to know which file the data comes from at the time of final sink
// to generate the index file corresponding to the source table data file,
// so we rewrite the sql here to add `_block_name` to select targets,
// so that we inline the file name into the data block.

// NOTE: if user already use the `_block_name` in their sql
// we no need add it and **MUST NOT** drop this column in sink phase.
// we will extract all agg function that select targets have
// and rewrite some agg functions like `avg`.
let mut query = query.clone();
// TODO(ariesdevil): unify the checker and rewriter.
let mut agg_index_rewritter = AggregatingIndexRewriter::new(self.dialect);
Expand Down Expand Up @@ -289,6 +282,17 @@ impl Binder {
let tokens = tokenize_sql(&index_meta.query)?;
let (mut stmt, _) = parse_sql(&tokens, self.dialect)?;

// The file name and block only correspond to each other at the time of table_scan,
// after multiple transformations, this correspondence does not exist,
// aggregating index needs to know which file the data comes from at the time of final sink
// to generate the index file corresponding to the source table data file,
// so we rewrite the sql here to add `_block_name` to select targets,
// so that we inline the file name into the data block.

// NOTE: if user already use the `_block_name` in their sql
// we no need add it and **MUST NOT** drop this column in sink phase.

// And we will rewrite the agg function to agg state func in this rewriter.
let mut index_rewriter = RefreshAggregatingIndexRewriter::default();
walk_statement_mut(&mut index_rewriter, &mut stmt);

Expand Down
55 changes: 49 additions & 6 deletions src/query/sql/src/planner/semantic/aggregating_index_visitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ impl VisitorMut for AggregatingIndexRewriter {
lambda,
..
} if !*distinct
&& args.len() == 1
&& SUPPORTED_AGGREGATING_INDEX_FUNCTIONS
.contains(&&*name.name.to_ascii_lowercase().to_lowercase())
&& window.is_none()
Expand All @@ -75,23 +74,49 @@ impl VisitorMut for AggregatingIndexRewriter {
if name.name.eq_ignore_ascii_case("avg") {
self.extract_avg(args);
} else {
let agg = format!("{}({})", name.name.to_ascii_uppercase(), args[0]);
let agg = format!(
"{}({})",
name.name.to_ascii_uppercase(),
args.iter().map(|arg| arg.to_string()).join(",")
);
self.extracted_aggs.insert(agg);
}
}
Expr::CountAll { window, .. } if window.is_none() => {
self.has_agg_function = true;
self.extracted_aggs.insert("COUNT()".to_string());
}
_ => {}
};
}

fn visit_select_stmt(&mut self, stmt: &mut SelectStmt) {
let SelectStmt { select_list, .. } = stmt;
let SelectStmt {
select_list,
group_by,
..
} = stmt;

// we save the targets' expr to a hashset, so if the group by
// items not in targets, we will add to target.
let mut select_list_exprs: HashSet<String> = HashSet::new();
select_list.iter().for_each(|target| {
if let SelectTarget::AliasedExpr { expr, alias } = target {
select_list_exprs.insert(expr.to_string());
if let Some(alias) = alias {
select_list_exprs.insert(alias.to_string());
}
}
});
let mut new_select_list: Vec<SelectTarget> = vec![];
for (position, target) in select_list.iter_mut().enumerate() {
walk_select_target_mut(self, target);
if self.has_agg_function {
// if target has agg function, we will extract the func to a hashset
// see `visit_expr` above for detail.
// we save the position of target that has agg function here,
// so that we can skip this target after.
// so that we can skip this target after and replace this skipped
// target with extracted agg function.
self.agg_func_positions.insert(position);
self.has_agg_function = false;
}
Expand Down Expand Up @@ -122,6 +147,26 @@ impl VisitorMut for AggregatingIndexRewriter {
}
});

match group_by {
Some(group_by) => match group_by {
GroupBy::Normal(groups) => {
groups.iter().for_each(|expr| {
// if group by item not in targets, we will add it in.
if !select_list_exprs.contains(&expr.to_string()) {
let target = SelectTarget::AliasedExpr {
expr: Box::new(expr.clone()),
alias: None,
};
new_select_list.push(target);
}
});
}
_ => unreachable!(),
},
None => {}
}

// replace the select list with our rewritten new select list.
*select_list = new_select_list;
}
}
Expand Down Expand Up @@ -255,11 +300,9 @@ impl VisitorMut for RefreshAggregatingIndexRewriter {
Expr::FunctionCall {
distinct,
name,
args,
window,
..
} if !*distinct
&& args.len() == 1
&& SUPPORTED_AGGREGATING_INDEX_FUNCTIONS
.contains(&&*name.name.to_ascii_lowercase().to_lowercase())
&& window.is_none() =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,8 @@ AggregateFinal
├── partitions total: 0
├── partitions scanned: 0
├── push downs: [filters: [], limit: NONE]
├── aggregating index: [SELECT COUNT(), MAX(a), MIN(a), SUM(a), b FROM test_index_db.t1 GROUP BY b]
├── rewritten query: [selection: [index_col_0 (#0), index_col_3 (#3), index_col_2 (#2), index_col_1 (#1)]]
└── estimated rows: 0.00


Expand Down

0 comments on commit 8cc7e63

Please sign in to comment.