diff --git a/sql/analyzer/indexed_joins.go b/sql/analyzer/indexed_joins.go index 00c62c3fc9..d55ab239b9 100644 --- a/sql/analyzer/indexed_joins.go +++ b/sql/analyzer/indexed_joins.go @@ -731,13 +731,8 @@ func addHashJoins(m *memo.Memo) error { case *memo.RecursiveTable: return nil } - rel := &memo.HashJoin{ - JoinBase: join.Copy(), - LeftAttrs: toExpr, - RightAttrs: fromExpr, - } - rel.Op = rel.Op.AsHash() - e.Group().Prepend(rel) + + m.MemoizeHashJoin(e.Group(), join, toExpr, fromExpr) return nil }) } diff --git a/sql/analyzer/pushdown.go b/sql/analyzer/pushdown.go index 7986118680..18ba0b3c81 100644 --- a/sql/analyzer/pushdown.go +++ b/sql/analyzer/pushdown.go @@ -50,7 +50,7 @@ func pushFilters(ctx *sql.Context, a *Analyzer, n sql.Node, scope *plan.Scope, s return plan.NewFilter(expression.JoinAnd(node.Expression, f.Expression), f.Child), transform.NewTree, nil } return node, transform.SameTree, nil - case *plan.TableAlias, *plan.ResolvedTable, *plan.ValueDerivedTable: + case *plan.TableAlias, *plan.ResolvedTable, *plan.ValueDerivedTable, sql.TableFunction: table, same, err := pushdownFiltersToAboveTable(ctx, a, node.(sql.NameableNode), scope, filters) if err != nil { return nil, transform.SameTree, err diff --git a/sql/memo/memo.go b/sql/memo/memo.go index 77e86e4802..d4907b6668 100644 --- a/sql/memo/memo.go +++ b/sql/memo/memo.go @@ -143,6 +143,11 @@ func (m *Memo) MemoizeInnerJoin(grp, left, right *ExprGroup, op plan.JoinType, f } func (m *Memo) MemoizeLookupJoin(grp, left, right *ExprGroup, op plan.JoinType, filter []sql.Expression, lookup *IndexScan) *ExprGroup { + if right.RelProps.reqIdxCols.Difference(lookup.Index.set).Len() > 0 { + // the index lookup does not cover the requested RHS indexScan columns, + // so this physical plan is invalid. + return grp + } newJoin := &LookupJoin{ JoinBase: &JoinBase{ relBase: &relBase{}, @@ -167,6 +172,28 @@ func (m *Memo) MemoizeLookupJoin(grp, left, right *ExprGroup, op plan.JoinType, return grp } +func (m *Memo) MemoizeHashJoin(grp *ExprGroup, join *JoinBase, toExpr, fromExpr []sql.Expression) *ExprGroup { + if join.Right.RelProps.reqIdxCols.Len() > 0 { + // HASH_JOIN's RHS will be a table scan, so this physical + // plan will not provide the requested indexScan + return grp + } + newJoin := &HashJoin{ + JoinBase: join.Copy(), + LeftAttrs: toExpr, + RightAttrs: fromExpr, + } + newJoin.Op = newJoin.Op.AsHash() + + if grp == nil { + return m.NewExprGroup(newJoin) + } + newJoin.g = grp + grp.Prepend(newJoin) + + return grp +} + // MemoizeConcatLookupJoin creates a lookup join over a set of disjunctions. // If a LOOKUP_JOIN simulates x = v1, a concat lookup performs x in (v1, v2, v3, ...) func (m *Memo) MemoizeConcatLookupJoin(grp, left, right *ExprGroup, op plan.JoinType, filter []sql.Expression, lookups []*IndexScan) *ExprGroup { diff --git a/sql/memo/rel_props.go b/sql/memo/rel_props.go index 0c32f65a49..357e9b0255 100644 --- a/sql/memo/rel_props.go +++ b/sql/memo/rel_props.go @@ -33,6 +33,7 @@ type relProps struct { fds *sql.FuncDepSet outputCols sql.ColSet + reqIdxCols sql.ColSet inputTables sql.FastIntSet outputTables sql.FastIntSet tableNodes []plan.TableIdNode @@ -64,6 +65,29 @@ func newRelProps(rel RelExpr) *relProps { n := r.TableIdNode() if len(n.Schema()) == n.Columns().Len() { p.outputCols = r.TableIdNode().Columns() + + firstCol, _ := n.Columns().Next(1) + + var table sql.Table + switch n := n.(type) { + case *plan.TableAlias: + if tn, ok := n.Child.(sql.TableNode); ok { + table = tn.UnderlyingTable() + } + case sql.TableNode: + table = n.UnderlyingTable() + } + var requiredIndexCols sql.ColSet + if table != nil { + if irt, ok := table.(sql.IndexRequired); ok { + cols := irt.RequiredPredicates() + for _, c := range cols { + i := n.Schema().IndexOfColName(c) + requiredIndexCols.Add(firstCol + sql.ColumnId(i)) + } + } + } + p.reqIdxCols = requiredIndexCols } else { // if the table is projected, capture subset of column ids var tw sql.TableNode @@ -86,10 +110,21 @@ func newRelProps(rel RelExpr) *relProps { colset.Add(firstCol + sql.ColumnId(i)) } p.outputCols = colset + + var requiredIndexCols sql.ColSet + if irt, ok := n.(sql.IndexRequired); ok { + cols := irt.RequiredPredicates() + for _, c := range cols { + i := sch.IndexOfColName(c) + requiredIndexCols.Add(firstCol + sql.ColumnId(i)) + } + } + p.reqIdxCols = requiredIndexCols } default: } + p.populateRequiredIdxCols() p.populateFds() p.stat = statsForRel(rel) p.populateOutputTables() @@ -117,6 +152,17 @@ func (p *relProps) GetStats() sql.Statistic { return p.stat } +func (p *relProps) populateRequiredIdxCols() { + switch rel := p.grp.First.(type) { + case *Distinct: + p.reqIdxCols = rel.Child.RelProps.reqIdxCols + case *Project: + p.reqIdxCols = rel.Child.RelProps.reqIdxCols + case *Filter: + p.reqIdxCols = rel.Child.RelProps.reqIdxCols + } +} + func (p *relProps) populateFds() { var fds *sql.FuncDepSet switch rel := p.grp.First.(type) { diff --git a/sql/tables.go b/sql/tables.go index 4ef2386c78..b8d53ce01b 100644 --- a/sql/tables.go +++ b/sql/tables.go @@ -111,6 +111,15 @@ type IndexAddressable interface { PreciseMatch() bool } +// IndexRequired tables cannot be executed without index lookups on certain +// columns. Join planning uses this interface to maintain plan correctness +// for these nodes +type IndexRequired interface { + IndexAddressable + // RequiredPredicates returns a list of columns that need IndexedTableAccess + RequiredPredicates() []string +} + // IndexAddressableTable is a table that can be accessed through an index type IndexAddressableTable interface { Table