Skip to content

Commit

Permalink
refactor: Rework limit node flow (sourcenetwork#767)
Browse files Browse the repository at this point in the history
* Add tests for one-many limits

* Simplify limit logic to make use of mapper

Some loss of efficiency here, I think the new limit nodes will make use of the same valuesNode (so not fetching from file twice) but not sure.  The planner should be able to optimise this better in the future by using the (bigger) limitNode as the source of the smaller limit node or something similar.
  • Loading branch information
AndrewSisley committed Aug 31, 2022
1 parent b6c5f9f commit 1be6032
Show file tree
Hide file tree
Showing 10 changed files with 677 additions and 514 deletions.
36 changes: 30 additions & 6 deletions query/graphql/mapper/mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -846,23 +846,47 @@ func RunFilter(doc interface{}, filter *Filter) (bool, error) {
}

// equal compares the given Targetables and returns true if they can be considered equal.
// Note: Currently only compares Name and Filter as that is all that is currently required,
// Note: Currently only compares Name, Filter and Limit as that is all that is currently required,
// but this should be extended in the future.
func (s Targetable) equal(other Targetable) bool {
if s.Index != other.Index &&
s.Name != other.Name {
return false
}

if s.Filter == nil {
return other.Filter == nil
if !s.Filter.equal(other.Filter) {
return false
}

if !s.Limit.equal(other.Limit) {
return false
}

return true
}

func (l *Limit) equal(other *Limit) bool {
if l == nil {
return other == nil
}

if other == nil {
return l == nil
}

return l.Limit == other.Limit && l.Offset == other.Offset
}

func (f *Filter) equal(other *Filter) bool {
if f == nil {
return other == nil
}

if other.Filter == nil {
return s.Filter == nil
if other == nil {
return f == nil
}

return reflect.DeepEqual(s.Filter.Conditions, other.Filter.Conditions)
return reflect.DeepEqual(f.Conditions, other.Conditions)
}

// aggregateRequest is an intermediary struct defining a consumer-requested
Expand Down
3 changes: 1 addition & 2 deletions query/graphql/planner/explain.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,8 @@ var (
_ explainablePlanNode = (*dagScanNode)(nil)
_ explainablePlanNode = (*deleteNode)(nil)
_ explainablePlanNode = (*groupNode)(nil)
_ explainablePlanNode = (*hardLimitNode)(nil)
_ explainablePlanNode = (*limitNode)(nil)
_ explainablePlanNode = (*orderNode)(nil)
_ explainablePlanNode = (*renderLimitNode)(nil)
_ explainablePlanNode = (*scanNode)(nil)
_ explainablePlanNode = (*selectNode)(nil)
_ explainablePlanNode = (*selectTopNode)(nil)
Expand Down
101 changes: 14 additions & 87 deletions query/graphql/planner/limit.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,11 @@ package planner
import (
"github.com/sourcenetwork/defradb/core"
"github.com/sourcenetwork/defradb/query/graphql/mapper"
parserTypes "github.com/sourcenetwork/defradb/query/graphql/parser/types"
)

// Limit the results, yielding only what the limit/offset permits
// @todo: Handle cursor
type hardLimitNode struct {
type limitNode struct {
docMapper

p *Planner
Expand All @@ -29,12 +28,12 @@ type hardLimitNode struct {
rowIndex int64
}

// HardLimit creates a new hardLimitNode initalized from the parser.Limit object.
func (p *Planner) HardLimit(parsed *mapper.Select, n *mapper.Limit) (*hardLimitNode, error) {
// Limit creates a new limitNode initalized from the parser.Limit object.
func (p *Planner) Limit(parsed *mapper.Select, n *mapper.Limit) (*limitNode, error) {
if n == nil {
return nil, nil // nothing to do
}
return &hardLimitNode{
return &limitNode{
p: p,
limit: n.Limit,
offset: n.Offset,
Expand All @@ -43,21 +42,21 @@ func (p *Planner) HardLimit(parsed *mapper.Select, n *mapper.Limit) (*hardLimitN
}, nil
}

func (n *hardLimitNode) Kind() string {
return "hardLimitNode"
func (n *limitNode) Kind() string {
return "limitNode"
}

func (n *hardLimitNode) Init() error {
func (n *limitNode) Init() error {
n.rowIndex = 0
return n.plan.Init()
}

func (n *hardLimitNode) Start() error { return n.plan.Start() }
func (n *hardLimitNode) Spans(spans core.Spans) { n.plan.Spans(spans) }
func (n *hardLimitNode) Close() error { return n.plan.Close() }
func (n *hardLimitNode) Value() core.Doc { return n.plan.Value() }
func (n *limitNode) Start() error { return n.plan.Start() }
func (n *limitNode) Spans(spans core.Spans) { n.plan.Spans(spans) }
func (n *limitNode) Close() error { return n.plan.Close() }
func (n *limitNode) Value() core.Doc { return n.plan.Value() }

func (n *hardLimitNode) Next() (bool, error) {
func (n *limitNode) Next() (bool, error) {
// check if we're passed the limit
if n.limit != 0 && n.rowIndex-n.offset >= n.limit {
return false, nil
Expand All @@ -79,81 +78,9 @@ func (n *hardLimitNode) Next() (bool, error) {
return true, nil
}

func (n *hardLimitNode) Source() planNode { return n.plan }
func (n *limitNode) Source() planNode { return n.plan }

func (n *hardLimitNode) Explain() (map[string]interface{}, error) {
exp := map[string]interface{}{
limitLabel: n.limit,
offsetLabel: n.offset,
}

if n.limit == 0 {
exp[limitLabel] = nil
}

return exp, nil
}

// limit the results, flagging any records outside the bounds of limit/offset with
// with a 'hidden' flag blocking rendering. Used if consumers of the results require
// the full dataset.
type renderLimitNode struct {
documentIterator
docMapper

p *Planner
plan planNode

limit int64
offset int64
rowIndex int64
}

// RenderLimit creates a new renderLimitNode initalized from
// the parser.Limit object.
func (p *Planner) RenderLimit(docMap *core.DocumentMapping, n *parserTypes.Limit) (*renderLimitNode, error) {
if n == nil {
return nil, nil // nothing to do
}
return &renderLimitNode{
p: p,
limit: n.Limit,
offset: n.Offset,
rowIndex: 0,
docMapper: docMapper{docMap},
}, nil
}

func (n *renderLimitNode) Kind() string {
return "renderLimitNode"
}

func (n *renderLimitNode) Init() error {
n.rowIndex = 0
return n.plan.Init()
}

func (n *renderLimitNode) Start() error { return n.plan.Start() }
func (n *renderLimitNode) Spans(spans core.Spans) { n.plan.Spans(spans) }
func (n *renderLimitNode) Close() error { return n.plan.Close() }

func (n *renderLimitNode) Next() (bool, error) {
if next, err := n.plan.Next(); !next {
return false, err
}

n.currentValue = n.plan.Value()

n.rowIndex++
if (n.limit != 0 && n.rowIndex-n.offset > n.limit) || n.rowIndex <= n.offset {
n.currentValue.Hidden = true
}
return true, nil
}

func (n *renderLimitNode) Source() planNode { return n.plan }

func (n *renderLimitNode) Explain() (map[string]interface{}, error) {
func (n *limitNode) Explain() (map[string]interface{}, error) {
exp := map[string]interface{}{
limitLabel: n.limit,
offsetLabel: n.offset,
Expand Down
3 changes: 1 addition & 2 deletions query/graphql/planner/operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,12 @@ var (
_ planNode = (*dagScanNode)(nil)
_ planNode = (*deleteNode)(nil)
_ planNode = (*groupNode)(nil)
_ planNode = (*hardLimitNode)(nil)
_ planNode = (*limitNode)(nil)
_ planNode = (*headsetScanNode)(nil)
_ planNode = (*multiScanNode)(nil)
_ planNode = (*orderNode)(nil)
_ planNode = (*parallelNode)(nil)
_ planNode = (*pipeNode)(nil)
_ planNode = (*renderLimitNode)(nil)
_ planNode = (*scanNode)(nil)
_ planNode = (*selectNode)(nil)
_ planNode = (*selectTopNode)(nil)
Expand Down
60 changes: 13 additions & 47 deletions query/graphql/planner/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,10 +280,7 @@ func (p *Planner) expandSelectTopNodePlan(plan *selectTopNode, parentPlan *selec
}

if plan.limit != nil {
err := p.expandLimitPlan(plan, parentPlan)
if err != nil {
return err
}
p.expandLimitPlan(plan, parentPlan)
}

return nil
Expand Down Expand Up @@ -372,51 +369,20 @@ func (p *Planner) expandGroupNodePlan(plan *selectTopNode) error {
return nil
}

func (p *Planner) expandLimitPlan(plan *selectTopNode, parentPlan *selectTopNode) error {
switch l := plan.limit.(type) {
case *hardLimitNode:
if l == nil {
return nil
}

// Limits get more complicated with groups and have to be handled internally, so we ensure
// any limit plan is disabled here
if parentPlan != nil && parentPlan.group != nil && len(parentPlan.group.childSelects) != 0 {
plan.limit = nil
return nil
}

// if this is a child node, and the parent select has an aggregate then we need to
// replace the hard limit with a render limit to allow the full set of child records
// to be aggregated
if parentPlan != nil && len(parentPlan.aggregates) > 0 {
renderLimit, err := p.RenderLimit(
parentPlan.documentMapping,
&parserTypes.Limit{
Offset: l.offset,
Limit: l.limit,
},
)
if err != nil {
return err
}
plan.limit = renderLimit

renderLimit.plan = plan.plan
plan.plan = plan.limit
} else {
l.plan = plan.plan
plan.plan = plan.limit
}
case *renderLimitNode:
if l == nil {
return nil
}
func (p *Planner) expandLimitPlan(plan *selectTopNode, parentPlan *selectTopNode) {
if plan.limit == nil {
return
}

l.plan = plan.plan
plan.plan = plan.limit
// Limits get more complicated with groups and have to be handled internally, so we ensure
// any limit plan is disabled here
if parentPlan != nil && parentPlan.group != nil && len(parentPlan.group.childSelects) != 0 {
plan.limit = nil
return
}
return nil

plan.limit.plan = plan.plan
plan.plan = plan.limit
}

// walkAndReplace walks through the provided plan, and searches for an instance
Expand Down
6 changes: 3 additions & 3 deletions query/graphql/planner/select.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type selectTopNode struct {

group *groupNode
order *orderNode
limit planNode
limit *limitNode
aggregates []aggregateNode

// selectnode is used pre-wiring of the plan (before expansion and all).
Expand Down Expand Up @@ -387,7 +387,7 @@ func (p *Planner) SelectFromSource(
return nil, err
}

limitPlan, err := p.HardLimit(parsed, limit)
limitPlan, err := p.Limit(parsed, limit)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -430,7 +430,7 @@ func (p *Planner) Select(parsed *mapper.Select) (planNode, error) {
return nil, err
}

limitPlan, err := p.HardLimit(parsed, limit)
limitPlan, err := p.Limit(parsed, limit)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions tests/integration/query/explain/group_with_limit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func TestExplainGroupByWithGroupLimitAndOffsetOnParentGroupBy(t *testing.T) {
{
"explain": dataMap{
"selectTopNode": dataMap{
"hardLimitNode": dataMap{
"limitNode": dataMap{
"limit": int64(1),
"offset": int64(1),
"groupNode": dataMap{
Expand Down Expand Up @@ -367,7 +367,7 @@ func TestExplainGroupByWithGroupLimitOnParentAndChild(t *testing.T) {
{
"explain": dataMap{
"selectTopNode": dataMap{
"hardLimitNode": dataMap{
"limitNode": dataMap{
"limit": int64(1),
"offset": int64(0),
"groupNode": dataMap{
Expand Down
Loading

0 comments on commit 1be6032

Please sign in to comment.