Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 16 additions & 7 deletions go/vt/vtgate/planbuilder/operators/derived.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
)

type Derived struct {
Source ops.Operator
*singleSource

Query sqlparser.SelectStatement
Alias string
Expand All @@ -42,13 +42,27 @@ type Derived struct {

var _ ops.PhysicalOperator = (*Derived)(nil)

func newDerived(
src ops.Operator,
stmt sqlparser.SelectStatement,
alias string,
columnAliases sqlparser.Columns,
) ops.Operator {
return &Derived{
singleSource: &singleSource{Source: src},
Query: stmt,
Alias: alias,
ColumnAliases: columnAliases,
}
}

// IPhysical implements the PhysicalOperator interface
func (d *Derived) IPhysical() {}

// Clone implements the Operator interface
func (d *Derived) Clone(inputs []ops.Operator) ops.Operator {
return &Derived{
Source: inputs[0],
singleSource: &singleSource{Source: inputs[0]},
Query: d.Query,
Alias: d.Alias,
ColumnAliases: sqlparser.CloneColumns(d.ColumnAliases),
Expand Down Expand Up @@ -102,11 +116,6 @@ func (d *Derived) IsMergeable(ctx *plancontext.PlanningContext) bool {
return isMergeable(ctx, d.Query, d)
}

// Inputs implements the Operator interface
func (d *Derived) Inputs() []ops.Operator {
return []ops.Operator{d.Source}
}

func (d *Derived) AddPredicate(ctx *plancontext.PlanningContext, expr sqlparser.Expr) (ops.Operator, error) {
if _, isUNion := d.Source.(*Union); isUNion {
// If we have a derived table on top of a UNION, we can let the UNION do the expression rewriting
Expand Down
14 changes: 5 additions & 9 deletions go/vt/vtgate/planbuilder/operators/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,16 @@ import (
)

type Filter struct {
Source ops.Operator
*singleSource

Predicates []sqlparser.Expr
}

var _ ops.PhysicalOperator = (*Filter)(nil)

func newFilter(op ops.Operator, expr sqlparser.Expr) ops.Operator {
return &Filter{
Source: op, Predicates: []sqlparser.Expr{expr},
singleSource: &singleSource{Source: op}, Predicates: []sqlparser.Expr{expr},
}
}

Expand All @@ -45,16 +46,11 @@ func (f *Filter) Clone(inputs []ops.Operator) ops.Operator {
predicatesClone := make([]sqlparser.Expr, len(f.Predicates))
copy(predicatesClone, f.Predicates)
return &Filter{
Source: inputs[0],
Predicates: predicatesClone,
singleSource: &singleSource{Source: inputs[0]},
Predicates: predicatesClone,
}
}

// Inputs implements the Operator interface
func (f *Filter) Inputs() []ops.Operator {
return []ops.Operator{f.Source}
}

// UnsolvedPredicates implements the unresolved interface
func (f *Filter) UnsolvedPredicates(st *semantics.SemTable) []sqlparser.Expr {
var result []sqlparser.Expr
Expand Down
18 changes: 11 additions & 7 deletions go/vt/vtgate/planbuilder/operators/horizon.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ import (
// Horizon is an operator we use until we decide how to handle the source to the horizon.
// It contains information about the planning we have to do after deciding how we will send the query to the tablets.
type Horizon struct {
Source ops.Operator
*singleSource

Select sqlparser.SelectStatement

noColumns
Expand All @@ -36,6 +37,13 @@ var _ ops.PhysicalOperator = (*Horizon)(nil)

func (h *Horizon) IPhysical() {}

func newHorizon(src ops.Operator, stmt sqlparser.SelectStatement) ops.Operator {
return &Horizon{
singleSource: &singleSource{src},
Select: stmt,
}
}

func (h *Horizon) AddPredicate(ctx *plancontext.PlanningContext, expr sqlparser.Expr) (ops.Operator, error) {
newSrc, err := h.Source.AddPredicate(ctx, expr)
if err != nil {
Expand All @@ -47,11 +55,7 @@ func (h *Horizon) AddPredicate(ctx *plancontext.PlanningContext, expr sqlparser.

func (h *Horizon) Clone(inputs []ops.Operator) ops.Operator {
return &Horizon{
Source: inputs[0],
Select: h.Select,
singleSource: &singleSource{inputs[0]},
Select: h.Select,
}
}

func (h *Horizon) Inputs() []ops.Operator {
return []ops.Operator{h.Source}
}
15 changes: 2 additions & 13 deletions go/vt/vtgate/planbuilder/operators/horizon_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,11 @@ limitations under the License.
package operators

import (
"errors"

"vitess.io/vitess/go/vt/vtgate/planbuilder/operators/rewrite"

"vitess.io/vitess/go/vt/vtgate/planbuilder/operators/ops"
)

var errNotHorizonPlanned = errors.New("query can't be fully operator planned")

func planHorizons(in ops.Operator) (ops.Operator, error) {
return rewrite.TopDown(in, func(in ops.Operator) (ops.Operator, rewrite.TreeIdentity, rewrite.VisitRule, error) {
switch in := in.(type) {
Expand All @@ -45,16 +41,9 @@ func planHorizons(in ops.Operator) (ops.Operator, error) {

func planHorizon(in *Horizon) (ops.Operator, error) {
rb, isRoute := in.Source.(*Route)
if !isRoute {
return in, nil
}
if isRoute && rb.IsSingleShard() && in.Select.GetLimit() == nil {
return planSingleShardRoute(rb, in)
return swap(in, rb), nil
}

return nil, errNotHorizonPlanned
}
func planSingleShardRoute(rb *Route, horizon *Horizon) (ops.Operator, error) {
rb.Source, horizon.Source = horizon, rb.Source
return rb, nil
return in, nil
}
38 changes: 14 additions & 24 deletions go/vt/vtgate/planbuilder/operators/logical.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,16 +68,10 @@ func createOperatorFromSelect(ctx *plancontext.PlanningContext, sel *sqlparser.S
}
}
if subq == nil {
return &Horizon{
Source: op,
Select: sel,
}, nil
return newHorizon(op, sel), nil
}
subq.Outer = op
return &Horizon{
Source: subq,
Select: sel,
}, nil
return newHorizon(subq, sel), nil
}

func createOperatorFromUnion(ctx *plancontext.PlanningContext, node *sqlparser.Union) (ops.Operator, error) {
Expand All @@ -100,7 +94,7 @@ func createOperatorFromUnion(ctx *plancontext.PlanningContext, node *sqlparser.U
Sources: []ops.Operator{opLHS, opRHS},
Ordering: node.OrderBy,
}
return &Horizon{Source: union, Select: node}, nil
return newHorizon(union, node), nil
}

func createOperatorFromUpdate(ctx *plancontext.PlanningContext, updStmt *sqlparser.Update) (ops.Operator, error) {
Expand All @@ -124,20 +118,22 @@ func createOperatorFromUpdate(ctx *plancontext.PlanningContext, updStmt *sqlpars
return nil, err
}

r := &Route{
Source: &Update{
r := newRoute(
&Update{
QTable: qt,
VTable: vindexTable,
Assignments: assignments,
ChangedVindexValues: cvv,
OwnedVindexQuery: ovq,
AST: updStmt,
},
RouteOpCode: opCode,
Keyspace: vindexTable.Keyspace,
VindexPreds: vp,
TargetDestination: dest,
}
opCode,
vindexTable.Keyspace,
dest,
vp,
nil,
nil,
)
Comment on lines +121 to +136
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a blocker at all, but just my personal opinion: It's quite unfortunate go does not support named arguments. The previous version that build the struct directly was much more readable, as you could see what the individual struct members were set to, where as now newRoute takes a bunch of values but it's not obvious what they mean (e.g. those two nil arguments at the end).


for _, predicate := range qt.Predicates {
err := r.UpdateRoutingLogic(ctx, predicate)
Expand Down Expand Up @@ -178,12 +174,7 @@ func createOperatorFromDelete(ctx *plancontext.PlanningContext, deleteStmt *sqlp
VTable: vindexTable,
AST: deleteStmt,
}
route := &Route{
Source: del,
RouteOpCode: opCode,
Keyspace: vindexTable.Keyspace,
TargetDestination: dest,
}
route := newRoute(del, opCode, vindexTable.Keyspace, dest, nil, nil, nil)

if !vindexTable.Keyspace.Sharded {
return route, nil
Expand Down Expand Up @@ -295,8 +286,7 @@ func getOperatorFromAliasedTableExpr(ctx *plancontext.PlanningContext, tableExpr
if horizon, ok := inner.(*Horizon); ok {
inner = horizon.Source
}

return &Derived{Alias: tableExpr.As.String(), Source: inner, Query: tbl.Select, ColumnAliases: tableExpr.Columns}, nil
return newDerived(inner, tbl.Select, tableExpr.As.String(), tableExpr.Columns), err
default:
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unable to use: %T", tbl)
}
Expand Down
39 changes: 34 additions & 5 deletions go/vt/vtgate/planbuilder/operators/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,16 @@ type (

// helper type that implements AddPredicate() returning an error
noPredicates struct{}

singleSource struct {
Source ops.Operator
}

singleSourceI interface {
ops.Operator
getSource() ops.Operator
setSource(ops.Operator)
}
)

func PlanQuery(ctx *plancontext.PlanningContext, selStmt sqlparser.Statement) (ops.Operator, error) {
Expand All @@ -67,12 +77,8 @@ func PlanQuery(ctx *plancontext.PlanningContext, selStmt sqlparser.Statement) (o
return nil, err
}

backup := Clone(op)

op, err = planHorizons(op)
if err == errNotHorizonPlanned {
op = backup
} else if err != nil {
if err != nil {
return nil, err
}

Expand All @@ -88,6 +94,19 @@ func (noInputs) Inputs() []ops.Operator {
return nil
}

// Inputs implements the Operator interface
func (s *singleSource) Inputs() []ops.Operator {
return []ops.Operator{s.Source}
}

func (s *singleSource) getSource() ops.Operator {
return s.Source
}

func (s *singleSource) setSource(src ops.Operator) {
s.Source = src
}

// AddColumn implements the Operator interface
func (noColumns) AddColumn(*plancontext.PlanningContext, sqlparser.Expr) (int, error) {
return 0, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "this operator cannot accept columns")
Expand All @@ -97,3 +116,13 @@ func (noColumns) AddColumn(*plancontext.PlanningContext, sqlparser.Expr) (int, e
func (noPredicates) AddPredicate(*plancontext.PlanningContext, sqlparser.Expr) (ops.Operator, error) {
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "this operator cannot accept predicates")
}

// swap takes two operators with a single source and swaps them. the assumption is that the first op provided has the
// second operator as source. this method will the two operators with each other and return the new root.
// this method is used to push one operator underneath the other
func swap(op, src singleSourceI) ops.Operator {
tmp := src.getSource()
src.setSource(op)
op.setSource(tmp)
return src
}
40 changes: 26 additions & 14 deletions go/vt/vtgate/planbuilder/operators/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (

type (
Route struct {
Source ops.Operator
*singleSource

RouteOpCode engine.Opcode
Keyspace *vindexes.Keyspace
Expand Down Expand Up @@ -89,6 +89,26 @@ type (

var _ ops.PhysicalOperator = (*Route)(nil)

func newRoute(
src ops.Operator,
opCode engine.Opcode,
keyspace *vindexes.Keyspace,
destination key.Destination,
vindexPreds []*VindexPlusPredicates,
sysTableTableSchema []evalengine.Expr,
sysTableTableName map[string]evalengine.Expr,
) *Route {
return &Route{
singleSource: &singleSource{src},
RouteOpCode: opCode,
Keyspace: keyspace,
VindexPreds: vindexPreds,
SysTableTableSchema: sysTableTableSchema,
SysTableTableName: sysTableTableName,
TargetDestination: destination,
}
}

// IPhysical implements the PhysicalOperator interface
func (*Route) IPhysical() {}

Expand Down Expand Up @@ -120,7 +140,7 @@ func (r *Route) Cost() int {
// Clone implements the Operator interface
func (r *Route) Clone(inputs []ops.Operator) ops.Operator {
cloneRoute := *r
cloneRoute.Source = inputs[0]
cloneRoute.singleSource = &singleSource{inputs[0]}
cloneRoute.VindexPreds = make([]*VindexPlusPredicates, len(r.VindexPreds))
for i, pred := range r.VindexPreds {
// we do this to create a copy of the struct
Expand All @@ -130,11 +150,6 @@ func (r *Route) Clone(inputs []ops.Operator) ops.Operator {
return &cloneRoute
}

// Inputs implements the Operator interface
func (r *Route) Inputs() []ops.Operator {
return []ops.Operator{r.Source}
}

func (r *Route) UpdateRoutingLogic(ctx *plancontext.PlanningContext, expr sqlparser.Expr) error {
r.SeenPredicates = append(r.SeenPredicates, expr)
return r.tryImprovingVindex(ctx, expr)
Expand Down Expand Up @@ -743,13 +758,10 @@ func createRoute(ctx *plancontext.PlanningContext, table *QueryTable, solves sem
table.Alias.As = sqlparser.NewIdentifierCS(name.String())
}
}
plan := &Route{
Source: &Table{
QTable: table,
VTable: vschemaTable,
},
Keyspace: vschemaTable.Keyspace,
}
plan := newRoute(&Table{
QTable: table,
VTable: vschemaTable,
}, 0, vschemaTable.Keyspace, nil, nil, nil, nil)

for _, columnVindex := range vschemaTable.ColumnVindexes {
plan.VindexPreds = append(plan.VindexPreds, &VindexPlusPredicates{ColVindex: columnVindex, TableID: solves})
Expand Down
Loading