From 9cbbe54ce79326da2c98ad1518debbf132ae8063 Mon Sep 17 00:00:00 2001 From: coldWater Date: Mon, 20 Jan 2025 15:38:50 +0800 Subject: [PATCH 1/5] fix --- .../processors/transforms/transform_sort_merge.rs | 9 ++++++--- .../transforms/transform_sort_merge_base.rs | 3 ++- .../transforms/transform_sort_merge_limit.rs | 1 - .../tests/it/pipelines/transforms/sort/spill.rs | 1 + .../base/03_common/03_0004_select_order_by.test | 12 ++++++++++++ 5 files changed, 21 insertions(+), 5 deletions(-) diff --git a/src/query/pipeline/transforms/src/processors/transforms/transform_sort_merge.rs b/src/query/pipeline/transforms/src/processors/transforms/transform_sort_merge.rs index 884c725a0aaee..dc3a7b9fec72c 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/transform_sort_merge.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/transform_sort_merge.rs @@ -43,7 +43,7 @@ use crate::processors::sort::Merger; pub struct TransformSortMerge { schema: DataSchemaRef, enable_loser_tree: bool, - + limit: Option, block_size: usize, buffer: Vec>, @@ -62,10 +62,12 @@ impl TransformSortMerge { _sort_desc: Arc>, block_size: usize, enable_loser_tree: bool, + limit: Option, ) -> Self { TransformSortMerge { schema, enable_loser_tree, + limit, block_size, buffer: vec![], aborting: Arc::new(AtomicBool::new(false)), @@ -171,7 +173,8 @@ impl TransformSortMerge { let streams = self.buffer.drain(..).collect::>(); let mut result = Vec::with_capacity(size_hint); - let mut merger = Merger::::create(self.schema.clone(), streams, batch_size, None); + let mut merger = + Merger::::create(self.schema.clone(), streams, batch_size, self.limit); while let Some(block) = merger.next_block()? { if unlikely(self.aborting.load(Ordering::Relaxed)) { @@ -218,7 +221,7 @@ pub fn sort_merge( 0, 0, sort_spilling_batch_bytes, - MergeSortCommonImpl::create(schema, sort_desc, block_size, enable_loser_tree), + MergeSortCommonImpl::create(schema, sort_desc, block_size, enable_loser_tree, None), )?; for block in data_blocks { processor.transform(block)?; diff --git a/src/query/pipeline/transforms/src/processors/transforms/transform_sort_merge_base.rs b/src/query/pipeline/transforms/src/processors/transforms/transform_sort_merge_base.rs index 16066059ad705..6ed6ef6e4f184 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/transform_sort_merge_base.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/transform_sort_merge_base.rs @@ -337,7 +337,7 @@ impl TransformSortMergeBuilder { !self.schema.has_field(ORDER_COL_NAME) }); - if self.limit.is_some() { + if self.limit.map(|limit| limit < 10000).unwrap_or_default() { self.build_sort_limit() } else { self.build_sort() @@ -400,6 +400,7 @@ impl TransformSortMergeBuilder { self.sort_desc, self.block_size, self.enable_loser_tree, + self.limit, ), )?, )) diff --git a/src/query/pipeline/transforms/src/processors/transforms/transform_sort_merge_limit.rs b/src/query/pipeline/transforms/src/processors/transforms/transform_sort_merge_limit.rs index 2f063d4010963..0217f1cb44064 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/transform_sort_merge_limit.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/transform_sort_merge_limit.rs @@ -127,7 +127,6 @@ impl CursorOrder for LocalCursorOrder { impl TransformSortMergeLimit { pub fn create(block_size: usize, limit: usize) -> Self { - debug_assert!(limit <= 10000, "Too large sort merge limit: {}", limit); TransformSortMergeLimit { heap: FixedHeap::new(limit), buffer: HashMap::with_capacity(limit), diff --git a/src/query/service/tests/it/pipelines/transforms/sort/spill.rs b/src/query/service/tests/it/pipelines/transforms/sort/spill.rs index 91fb7ee03e9d0..cc2b856774d67 100644 --- a/src/query/service/tests/it/pipelines/transforms/sort/spill.rs +++ b/src/query/service/tests/it/pipelines/transforms/sort/spill.rs @@ -71,6 +71,7 @@ fn create_sort_spill_pipeline( sort_desc.clone(), block_size, enable_loser_tree, + None, ), ) })?; diff --git a/tests/sqllogictests/suites/base/03_common/03_0004_select_order_by.test b/tests/sqllogictests/suites/base/03_common/03_0004_select_order_by.test index 56bfef5b164ff..fc0c18df43cd5 100644 --- a/tests/sqllogictests/suites/base/03_common/03_0004_select_order_by.test +++ b/tests/sqllogictests/suites/base/03_common/03_0004_select_order_by.test @@ -113,6 +113,18 @@ INSERT INTO t3 VALUES(1, [1,2,3]), (2, [1,2,4]), (3, []), (4, [3,4,5]), (5, [4]) statement ok DROP TABLE t3 +query I +SELECT number FROM numbers(3) ORDER BY number LIMIT 214748364900; +---- +0 +1 +2 + +query I +SELECT number FROM numbers(100000) ORDER BY number LIMIT 1 OFFSET 50000; +---- +50000 + query I SELECT number FROM numbers(10000) ORDER BY number LIMIT 3 ---- From 80fb337214120fca2cc9263dab9270f121c69902 Mon Sep 17 00:00:00 2001 From: coldWater Date: Tue, 21 Jan 2025 15:16:04 +0800 Subject: [PATCH 2/5] update --- .../src/pipelines/builders/builder_window.rs | 4 +- src/query/settings/src/settings_default.rs | 7 + .../settings/src/settings_getter_setter.rs | 4 + .../sql/src/planner/optimizer/optimizer.rs | 7 + .../sql/src/planner/optimizer/rule/factory.rs | 24 +-- .../rewrite/rule_push_down_limit_aggregate.rs | 159 +++++++++--------- .../rule/rewrite/rule_push_down_limit_scan.rs | 29 ++-- .../rewrite/rule_push_down_limit_window.rs | 43 ++--- src/query/sql/src/planner/planner.rs | 1 + .../03_common/03_0004_select_order_by.test | 12 -- .../03_common/03_0042_select_large_limit.test | 13 ++ 11 files changed, 173 insertions(+), 130 deletions(-) diff --git a/src/query/service/src/pipelines/builders/builder_window.rs b/src/query/service/src/pipelines/builders/builder_window.rs index 4205eb35cc564..cd19d1ea9d061 100644 --- a/src/query/service/src/pipelines/builders/builder_window.rs +++ b/src/query/service/src/pipelines/builders/builder_window.rs @@ -170,7 +170,9 @@ impl PipelineBuilder { }) .collect::>>()?; - if let Some(top_n) = &window_partition.top_n { + if let Some(top_n) = &window_partition.top_n + && top_n.top < 10000 + { self.main_pipeline.exchange( num_processors, WindowPartitionTopNExchange::create( diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index aa3d2da50b073..7fa04f09c6080 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -324,6 +324,13 @@ impl DefaultSettings { scope: SettingScope::Both, range: Some(SettingRange::Numeric(0..=1)), }), + ("max_push_down_limit", DefaultSettingValue { + value: UserSettingValue::UInt64(10000), + desc: "Sets the maximum number of rows limit that can be pushed down to the leaf operator.", + mode: SettingMode::Both, + scope: SettingScope::Both, + range: Some(SettingRange::Numeric(0..=u64::MAX)), + }), ("join_spilling_memory_ratio", DefaultSettingValue { value: UserSettingValue::UInt64(60), desc: "Sets the maximum memory ratio in bytes that hash join can use before spilling data to storage during query execution, 0 is unlimited", diff --git a/src/query/settings/src/settings_getter_setter.rs b/src/query/settings/src/settings_getter_setter.rs index a7524758608c3..0988780ec6351 100644 --- a/src/query/settings/src/settings_getter_setter.rs +++ b/src/query/settings/src/settings_getter_setter.rs @@ -319,6 +319,10 @@ impl Settings { Ok(self.unchecked_try_get_u64("disable_join_reorder")? != 0) } + pub fn get_max_push_down_limit(&self) -> Result { + Ok(self.try_get_u64("max_push_down_limit")? as usize) + } + pub fn get_join_spilling_memory_ratio(&self) -> Result { Ok(self.try_get_u64("join_spilling_memory_ratio")? as usize) } diff --git a/src/query/sql/src/planner/optimizer/optimizer.rs b/src/query/sql/src/planner/optimizer/optimizer.rs index 03257f9738030..de9ebc502e301 100644 --- a/src/query/sql/src/planner/optimizer/optimizer.rs +++ b/src/query/sql/src/planner/optimizer/optimizer.rs @@ -71,6 +71,7 @@ pub struct OptimizerContext { pub(crate) enable_distributed_optimization: bool, enable_join_reorder: bool, enable_dphyp: bool, + pub(crate) max_push_down_limit: usize, planning_agg_index: bool, #[educe(Debug(ignore))] pub(crate) sample_executor: Option>, @@ -85,6 +86,7 @@ impl OptimizerContext { enable_distributed_optimization: false, enable_join_reorder: true, enable_dphyp: true, + max_push_down_limit: 10000, sample_executor: None, planning_agg_index: false, } @@ -114,6 +116,11 @@ impl OptimizerContext { self.planning_agg_index = true; self } + + pub fn with_max_push_down_limit(mut self, max_push_down_limit: usize) -> Self { + self.max_push_down_limit = max_push_down_limit; + self + } } /// A recursive optimizer that will apply the given rules recursively. diff --git a/src/query/sql/src/planner/optimizer/rule/factory.rs b/src/query/sql/src/planner/optimizer/rule/factory.rs index 0aa1fd27b4fff..47c16fa62e7d6 100644 --- a/src/query/sql/src/planner/optimizer/rule/factory.rs +++ b/src/query/sql/src/planner/optimizer/rule/factory.rs @@ -56,8 +56,6 @@ use crate::optimizer::OptimizerContext; pub struct RuleFactory; -pub const MAX_PUSH_DOWN_LIMIT: usize = 10000; - impl RuleFactory { pub fn create_rule(id: RuleID, ctx: OptimizerContext) -> Result { match id { @@ -74,22 +72,24 @@ impl RuleFactory { RuleID::PushDownFilterProjectSet => Ok(Box::new(RulePushDownFilterProjectSet::new())), RuleID::PushDownLimit => Ok(Box::new(RulePushDownLimit::new(ctx.metadata))), RuleID::PushDownLimitUnion => Ok(Box::new(RulePushDownLimitUnion::new())), - RuleID::PushDownLimitScan => Ok(Box::new(RulePushDownLimitScan::new())), + RuleID::PushDownLimitScan => Ok(Box::new(RulePushDownLimitScan::new( + ctx.max_push_down_limit, + ))), RuleID::PushDownSortScan => Ok(Box::new(RulePushDownSortScan::new())), RuleID::PushDownSortEvalScalar => { Ok(Box::new(RulePushDownSortEvalScalar::new(ctx.metadata))) } RuleID::PushDownLimitOuterJoin => Ok(Box::new(RulePushDownLimitOuterJoin::new())), RuleID::PushDownLimitEvalScalar => Ok(Box::new(RulePushDownLimitEvalScalar::new())), - RuleID::PushDownLimitSort => { - Ok(Box::new(RulePushDownLimitSort::new(MAX_PUSH_DOWN_LIMIT))) - } - RuleID::PushDownLimitWindow => { - Ok(Box::new(RulePushDownLimitWindow::new(MAX_PUSH_DOWN_LIMIT))) - } - RuleID::RulePushDownRankLimitAggregate => { - Ok(Box::new(RulePushDownRankLimitAggregate::new())) - } + RuleID::PushDownLimitSort => Ok(Box::new(RulePushDownLimitSort::new( + ctx.max_push_down_limit, + ))), + RuleID::PushDownLimitWindow => Ok(Box::new(RulePushDownLimitWindow::new( + ctx.max_push_down_limit, + ))), + RuleID::RulePushDownRankLimitAggregate => Ok(Box::new( + RulePushDownRankLimitAggregate::new(ctx.max_push_down_limit), + )), RuleID::PushDownFilterAggregate => Ok(Box::new(RulePushDownFilterAggregate::new())), RuleID::PushDownFilterWindow => Ok(Box::new(RulePushDownFilterWindow::new())), RuleID::PushDownFilterWindowTopN => { diff --git a/src/query/sql/src/planner/optimizer/rule/rewrite/rule_push_down_limit_aggregate.rs b/src/query/sql/src/planner/optimizer/rule/rewrite/rule_push_down_limit_aggregate.rs index aaf241cd5e63f..60723b67d2b51 100644 --- a/src/query/sql/src/planner/optimizer/rule/rewrite/rule_push_down_limit_aggregate.rs +++ b/src/query/sql/src/planner/optimizer/rule/rewrite/rule_push_down_limit_aggregate.rs @@ -41,10 +41,11 @@ use crate::plans::SortItem; pub struct RulePushDownRankLimitAggregate { id: RuleID, matchers: Vec, + max_limit: usize, } impl RulePushDownRankLimitAggregate { - pub fn new() -> Self { + pub fn new(max_limit: usize) -> Self { Self { id: RuleID::RulePushDownRankLimitAggregate, matchers: vec![ @@ -73,6 +74,7 @@ impl RulePushDownRankLimitAggregate { }], }, ], + max_limit, } } @@ -84,40 +86,45 @@ impl RulePushDownRankLimitAggregate { state: &mut TransformResult, ) -> databend_common_exception::Result<()> { let limit: Limit = s_expr.plan().clone().try_into()?; - if let Some(mut count) = limit.limit { - count += limit.offset; - let agg = s_expr.child(0)?; - let mut agg_limit: Aggregate = agg.plan().clone().try_into()?; - - let sort_items = agg_limit - .group_items - .iter() - .map(|g| SortItem { - index: g.index, - asc: true, - nulls_first: false, - }) - .collect::>(); - agg_limit.rank_limit = Some((sort_items.clone(), count)); - - let sort = Sort { - items: sort_items.clone(), - limit: Some(count), - after_exchange: None, - pre_projection: None, - window_partition: None, - }; - - let agg = SExpr::create_unary( - Arc::new(RelOperator::Aggregate(agg_limit)), - Arc::new(agg.child(0)?.clone()), - ); - let sort = SExpr::create_unary(Arc::new(RelOperator::Sort(sort)), agg.into()); - let mut result = s_expr.replace_children(vec![Arc::new(sort)]); - - result.set_applied_rule(&self.id); - state.add_result(result); - } + let Some(mut count) = limit.limit else { + return Ok(()); + }; + count += limit.offset; + let agg = s_expr.child(0)?; + let mut agg_limit: Aggregate = agg.plan().clone().try_into()?; + + let sort_items = agg_limit + .group_items + .iter() + .map(|g| SortItem { + index: g.index, + asc: true, + nulls_first: false, + }) + .collect::>(); + agg_limit.rank_limit = Some((sort_items.clone(), count)); + + let sort = Sort { + items: sort_items.clone(), + limit: if count < self.max_limit { + Some(count) + } else { + None + }, + after_exchange: None, + pre_projection: None, + window_partition: None, + }; + + let agg = SExpr::create_unary( + Arc::new(RelOperator::Aggregate(agg_limit)), + Arc::new(agg.child(0)?.clone()), + ); + let sort = SExpr::create_unary(Arc::new(RelOperator::Sort(sort)), agg.into()); + let mut result = s_expr.replace_children(vec![Arc::new(sort)]); + + result.set_applied_rule(&self.id); + state.add_result(result); Ok(()) } @@ -137,53 +144,55 @@ impl RulePushDownRankLimitAggregate { _ => return Ok(()), }; + let Some(limit) = sort.limit else { + return Ok(()); + }; + let mut agg_limit: Aggregate = agg_limit_expr.plan().clone().try_into()?; - if let Some(limit) = sort.limit { - let is_order_subset = sort - .items - .iter() - .all(|k| agg_limit.group_items.iter().any(|g| g.index == k.index)); + let is_order_subset = sort + .items + .iter() + .all(|k| agg_limit.group_items.iter().any(|g| g.index == k.index)); + if !is_order_subset { + return Ok(()); + } - if !is_order_subset { - return Ok(()); - } - let mut sort_items = Vec::with_capacity(agg_limit.group_items.len()); - let mut not_found_sort_items = vec![]; - for i in 0..agg_limit.group_items.len() { - let group_item = &agg_limit.group_items[i]; - if let Some(sort_item) = sort.items.iter().find(|k| k.index == group_item.index) { - sort_items.push(SortItem { - index: group_item.index, - asc: sort_item.asc, - nulls_first: sort_item.nulls_first, - }); - } else { - not_found_sort_items.push(SortItem { - index: group_item.index, - asc: true, - nulls_first: false, - }); - } + let mut sort_items = Vec::with_capacity(agg_limit.group_items.len()); + let mut not_found_sort_items = vec![]; + for i in 0..agg_limit.group_items.len() { + let group_item = &agg_limit.group_items[i]; + if let Some(sort_item) = sort.items.iter().find(|k| k.index == group_item.index) { + sort_items.push(SortItem { + index: group_item.index, + asc: sort_item.asc, + nulls_first: sort_item.nulls_first, + }); + } else { + not_found_sort_items.push(SortItem { + index: group_item.index, + asc: true, + nulls_first: false, + }); } - sort_items.extend(not_found_sort_items); + } + sort_items.extend(not_found_sort_items); - agg_limit.rank_limit = Some((sort_items, limit)); + agg_limit.rank_limit = Some((sort_items, limit)); - let agg = SExpr::create_unary( - Arc::new(RelOperator::Aggregate(agg_limit)), - Arc::new(agg_limit_expr.child(0)?.clone()), - ); + let agg = SExpr::create_unary( + Arc::new(RelOperator::Aggregate(agg_limit)), + Arc::new(agg_limit_expr.child(0)?.clone()), + ); - let mut result = if has_eval_scalar { - let eval_scalar = s_expr.child(0)?.replace_children(vec![Arc::new(agg)]); - s_expr.replace_children(vec![Arc::new(eval_scalar)]) - } else { - s_expr.replace_children(vec![Arc::new(agg)]) - }; - result.set_applied_rule(&self.id); - state.add_result(result); - } + let mut result = if has_eval_scalar { + let eval_scalar = s_expr.child(0)?.replace_children(vec![Arc::new(agg)]); + s_expr.replace_children(vec![Arc::new(eval_scalar)]) + } else { + s_expr.replace_children(vec![Arc::new(agg)]) + }; + result.set_applied_rule(&self.id); + state.add_result(result); Ok(()) } } diff --git a/src/query/sql/src/planner/optimizer/rule/rewrite/rule_push_down_limit_scan.rs b/src/query/sql/src/planner/optimizer/rule/rewrite/rule_push_down_limit_scan.rs index ed7f35a1b7f16..ed2e0a50a1ec3 100644 --- a/src/query/sql/src/planner/optimizer/rule/rewrite/rule_push_down_limit_scan.rs +++ b/src/query/sql/src/planner/optimizer/rule/rewrite/rule_push_down_limit_scan.rs @@ -38,10 +38,11 @@ use crate::plans::Scan; pub struct RulePushDownLimitScan { id: RuleID, matchers: Vec, + max_limit: usize, } impl RulePushDownLimitScan { - pub fn new() -> Self { + pub fn new(max_limit: usize) -> Self { Self { id: RuleID::PushDownLimitScan, matchers: vec![Matcher::MatchOp { @@ -51,6 +52,7 @@ impl RulePushDownLimitScan { children: vec![], }], }], + max_limit, } } } @@ -62,17 +64,22 @@ impl Rule for RulePushDownLimitScan { fn apply(&self, s_expr: &SExpr, state: &mut TransformResult) -> Result<()> { let limit: Limit = s_expr.plan().clone().try_into()?; - if let Some(mut count) = limit.limit { - let child = s_expr.child(0)?; - let mut get: Scan = child.plan().clone().try_into()?; - count += limit.offset; - get.limit = Some(get.limit.map_or(count, |c| cmp::max(c, count))); - let get = SExpr::create_leaf(Arc::new(RelOperator::Scan(get))); - - let mut result = s_expr.replace_children(vec![Arc::new(get)]); - result.set_applied_rule(&self.id); - state.add_result(result); + let Some(mut count) = limit.limit else { + return Ok(()); + }; + count += limit.offset; + if count > self.max_limit { + return Ok(()); } + + let child = s_expr.child(0)?; + let mut get: Scan = child.plan().clone().try_into()?; + get.limit = Some(get.limit.map_or(count, |c| cmp::max(c, count))); + let get = SExpr::create_leaf(Arc::new(RelOperator::Scan(get))); + + let mut result = s_expr.replace_children(vec![Arc::new(get)]); + result.set_applied_rule(&self.id); + state.add_result(result); Ok(()) } diff --git a/src/query/sql/src/planner/optimizer/rule/rewrite/rule_push_down_limit_window.rs b/src/query/sql/src/planner/optimizer/rule/rewrite/rule_push_down_limit_window.rs index dc009c733f06f..70dc476a46b0a 100644 --- a/src/query/sql/src/planner/optimizer/rule/rewrite/rule_push_down_limit_window.rs +++ b/src/query/sql/src/planner/optimizer/rule/rewrite/rule_push_down_limit_window.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::cmp; use std::sync::Arc; use databend_common_exception::Result; @@ -71,26 +70,32 @@ impl Rule for RulePushDownLimitWindow { fn apply(&self, s_expr: &SExpr, state: &mut TransformResult) -> Result<()> { let limit: Limit = s_expr.plan().clone().try_into()?; - if let Some(mut count) = limit.limit { - count += limit.offset; - let window = s_expr.child(0)?; - let mut window_limit: LogicalWindow = window.plan().clone().try_into()?; - if should_apply(window.child(0)?, &window_limit)? { - let limit = window_limit.limit.map_or(count, |c| cmp::max(c, count)); + let Some(mut count) = limit.limit else { + return Ok(()); + }; + count += limit.offset; + if count > self.max_limit { + return Ok(()); + } + let window = s_expr.child(0)?; + let mut window_limit: LogicalWindow = window.plan().clone().try_into()?; + let limit = window_limit.limit.map_or(count, |c| c.max(count)); + if limit > self.max_limit { + return Ok(()); + } + if !should_apply(window.child(0)?, &window_limit)? { + return Ok(()); + } - if limit <= self.max_limit { - window_limit.limit = Some(limit); - let sort = SExpr::create_unary( - Arc::new(RelOperator::Window(window_limit)), - Arc::new(window.child(0)?.clone()), - ); + window_limit.limit = Some(limit); + let sort = SExpr::create_unary( + Arc::new(RelOperator::Window(window_limit)), + Arc::new(window.child(0)?.clone()), + ); + let mut result = s_expr.replace_children(vec![Arc::new(sort)]); + result.set_applied_rule(&self.id); + state.add_result(result); - let mut result = s_expr.replace_children(vec![Arc::new(sort)]); - result.set_applied_rule(&self.id); - state.add_result(result); - } - } - } Ok(()) } diff --git a/src/query/sql/src/planner/planner.rs b/src/query/sql/src/planner/planner.rs index 5eeb3c9455b35..57192e74d04de 100644 --- a/src/query/sql/src/planner/planner.rs +++ b/src/query/sql/src/planner/planner.rs @@ -275,6 +275,7 @@ impl Planner { .with_enable_distributed_optimization(!self.ctx.get_cluster().is_empty()) .with_enable_join_reorder(unsafe { !settings.get_disable_join_reorder()? }) .with_enable_dphyp(settings.get_enable_dphyp()?) + .with_max_push_down_limit(settings.get_max_push_down_limit()?) .with_sample_executor(self.query_executor.clone()); let optimized_plan = optimize(opt_ctx, plan).await?; diff --git a/tests/sqllogictests/suites/base/03_common/03_0004_select_order_by.test b/tests/sqllogictests/suites/base/03_common/03_0004_select_order_by.test index fc0c18df43cd5..56bfef5b164ff 100644 --- a/tests/sqllogictests/suites/base/03_common/03_0004_select_order_by.test +++ b/tests/sqllogictests/suites/base/03_common/03_0004_select_order_by.test @@ -113,18 +113,6 @@ INSERT INTO t3 VALUES(1, [1,2,3]), (2, [1,2,4]), (3, []), (4, [3,4,5]), (5, [4]) statement ok DROP TABLE t3 -query I -SELECT number FROM numbers(3) ORDER BY number LIMIT 214748364900; ----- -0 -1 -2 - -query I -SELECT number FROM numbers(100000) ORDER BY number LIMIT 1 OFFSET 50000; ----- -50000 - query I SELECT number FROM numbers(10000) ORDER BY number LIMIT 3 ---- diff --git a/tests/sqllogictests/suites/base/03_common/03_0042_select_large_limit.test b/tests/sqllogictests/suites/base/03_common/03_0042_select_large_limit.test index 308219a38daa0..bb177f8047621 100644 --- a/tests/sqllogictests/suites/base/03_common/03_0042_select_large_limit.test +++ b/tests/sqllogictests/suites/base/03_common/03_0042_select_large_limit.test @@ -24,5 +24,18 @@ SELECT id, v FROM t1 order by id LIMIT 100000000; 3 616161 4 616161 +query I +SELECT id FROM t1 GROUP BY id LIMIT 2147483649 OFFSET 0; +---- +1 +2 +3 +4 + +query I +SELECT number FROM numbers(100000) GROUP BY number LIMIT 1 OFFSET 50000; +---- +50000 + statement ok DROP DATABASE db_large_limit From 000910e518b4059ac59844e5bf4161aa7d3cf947 Mon Sep 17 00:00:00 2001 From: coldWater Date: Tue, 21 Jan 2025 15:28:04 +0800 Subject: [PATCH 3/5] update --- .../rule/rewrite/rule_push_down_limit_aggregate.rs | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/src/query/sql/src/planner/optimizer/rule/rewrite/rule_push_down_limit_aggregate.rs b/src/query/sql/src/planner/optimizer/rule/rewrite/rule_push_down_limit_aggregate.rs index 60723b67d2b51..9052964a38a05 100644 --- a/src/query/sql/src/planner/optimizer/rule/rewrite/rule_push_down_limit_aggregate.rs +++ b/src/query/sql/src/planner/optimizer/rule/rewrite/rule_push_down_limit_aggregate.rs @@ -90,6 +90,9 @@ impl RulePushDownRankLimitAggregate { return Ok(()); }; count += limit.offset; + if count > self.max_limit { + return Ok(()); + } let agg = s_expr.child(0)?; let mut agg_limit: Aggregate = agg.plan().clone().try_into()?; @@ -106,11 +109,7 @@ impl RulePushDownRankLimitAggregate { let sort = Sort { items: sort_items.clone(), - limit: if count < self.max_limit { - Some(count) - } else { - None - }, + limit: Some(count), after_exchange: None, pre_projection: None, window_partition: None, From c6854e97c66a7d9b16329006cbda1f0c173f0099 Mon Sep 17 00:00:00 2001 From: coldWater Date: Tue, 21 Jan 2025 21:02:41 +0800 Subject: [PATCH 4/5] fix --- .../suites/base/03_common/03_0042_select_large_limit.test | 5 ----- 1 file changed, 5 deletions(-) diff --git a/tests/sqllogictests/suites/base/03_common/03_0042_select_large_limit.test b/tests/sqllogictests/suites/base/03_common/03_0042_select_large_limit.test index bb177f8047621..357fef75a3def 100644 --- a/tests/sqllogictests/suites/base/03_common/03_0042_select_large_limit.test +++ b/tests/sqllogictests/suites/base/03_common/03_0042_select_large_limit.test @@ -32,10 +32,5 @@ SELECT id FROM t1 GROUP BY id LIMIT 2147483649 OFFSET 0; 3 4 -query I -SELECT number FROM numbers(100000) GROUP BY number LIMIT 1 OFFSET 50000; ----- -50000 - statement ok DROP DATABASE db_large_limit From cd91dfcf643e9b69123badac61d98c4377e30db6 Mon Sep 17 00:00:00 2001 From: coldWater Date: Wed, 22 Jan 2025 11:07:00 +0800 Subject: [PATCH 5/5] fix --- src/query/sql/src/planner/optimizer/rule/factory.rs | 4 +--- .../optimizer/rule/rewrite/rule_push_down_limit_scan.rs | 7 +------ .../suites/base/03_common/03_0042_select_large_limit.test | 2 +- 3 files changed, 3 insertions(+), 10 deletions(-) diff --git a/src/query/sql/src/planner/optimizer/rule/factory.rs b/src/query/sql/src/planner/optimizer/rule/factory.rs index 47c16fa62e7d6..bee836dc79dae 100644 --- a/src/query/sql/src/planner/optimizer/rule/factory.rs +++ b/src/query/sql/src/planner/optimizer/rule/factory.rs @@ -72,9 +72,7 @@ impl RuleFactory { RuleID::PushDownFilterProjectSet => Ok(Box::new(RulePushDownFilterProjectSet::new())), RuleID::PushDownLimit => Ok(Box::new(RulePushDownLimit::new(ctx.metadata))), RuleID::PushDownLimitUnion => Ok(Box::new(RulePushDownLimitUnion::new())), - RuleID::PushDownLimitScan => Ok(Box::new(RulePushDownLimitScan::new( - ctx.max_push_down_limit, - ))), + RuleID::PushDownLimitScan => Ok(Box::new(RulePushDownLimitScan::new())), RuleID::PushDownSortScan => Ok(Box::new(RulePushDownSortScan::new())), RuleID::PushDownSortEvalScalar => { Ok(Box::new(RulePushDownSortEvalScalar::new(ctx.metadata))) diff --git a/src/query/sql/src/planner/optimizer/rule/rewrite/rule_push_down_limit_scan.rs b/src/query/sql/src/planner/optimizer/rule/rewrite/rule_push_down_limit_scan.rs index ed2e0a50a1ec3..129350c0dbde0 100644 --- a/src/query/sql/src/planner/optimizer/rule/rewrite/rule_push_down_limit_scan.rs +++ b/src/query/sql/src/planner/optimizer/rule/rewrite/rule_push_down_limit_scan.rs @@ -38,11 +38,10 @@ use crate::plans::Scan; pub struct RulePushDownLimitScan { id: RuleID, matchers: Vec, - max_limit: usize, } impl RulePushDownLimitScan { - pub fn new(max_limit: usize) -> Self { + pub fn new() -> Self { Self { id: RuleID::PushDownLimitScan, matchers: vec![Matcher::MatchOp { @@ -52,7 +51,6 @@ impl RulePushDownLimitScan { children: vec![], }], }], - max_limit, } } } @@ -68,9 +66,6 @@ impl Rule for RulePushDownLimitScan { return Ok(()); }; count += limit.offset; - if count > self.max_limit { - return Ok(()); - } let child = s_expr.child(0)?; let mut get: Scan = child.plan().clone().try_into()?; diff --git a/tests/sqllogictests/suites/base/03_common/03_0042_select_large_limit.test b/tests/sqllogictests/suites/base/03_common/03_0042_select_large_limit.test index 357fef75a3def..5274683912222 100644 --- a/tests/sqllogictests/suites/base/03_common/03_0042_select_large_limit.test +++ b/tests/sqllogictests/suites/base/03_common/03_0042_select_large_limit.test @@ -25,7 +25,7 @@ SELECT id, v FROM t1 order by id LIMIT 100000000; 4 616161 query I -SELECT id FROM t1 GROUP BY id LIMIT 2147483649 OFFSET 0; +SELECT id FROM t1 GROUP BY id ORDER BY id LIMIT 2147483649 OFFSET 0; ---- 1 2