Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Rework limit node flow #767

Merged
merged 3 commits into from
Aug 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 30 additions & 6 deletions query/graphql/mapper/mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -846,23 +846,47 @@ func RunFilter(doc interface{}, filter *Filter) (bool, error) {
}

// equal compares the given Targetables and returns true if they can be considered equal.
// Note: Currently only compares Name and Filter as that is all that is currently required,
// Note: Currently only compares Name, Filter and Limit as that is all that is currently required,
// but this should be extended in the future.
func (s Targetable) equal(other Targetable) bool {
if s.Index != other.Index &&
s.Name != other.Name {
return false
}

if s.Filter == nil {
return other.Filter == nil
if !s.Filter.equal(other.Filter) {
return false
}

if !s.Limit.equal(other.Limit) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

todo: Since you are adding this new Limit comparison, please update the above Note.

suggestion(non-blocking): Removal of these lines:

// Note: Currently only compares Name and Filter as that is all that is currently required,
// but this should be extended in the future.

Copy link
Contributor Author

@AndrewSisley AndrewSisley Aug 30, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah cheers, I saw that and forgot about it by the time I'd fixed the tests. Changing.

  • Update comment

return false
}

return true
}

func (l *Limit) equal(other *Limit) bool {
if l == nil {
return other == nil
}

if other == nil {
return l == nil
}

return l.Limit == other.Limit && l.Offset == other.Offset
}

func (f *Filter) equal(other *Filter) bool {
if f == nil {
return other == nil
}

if other.Filter == nil {
return s.Filter == nil
if other == nil {
return f == nil
}

return reflect.DeepEqual(s.Filter.Conditions, other.Filter.Conditions)
return reflect.DeepEqual(f.Conditions, other.Conditions)
}

// aggregateRequest is an intermediary struct defining a consumer-requested
Expand Down
3 changes: 1 addition & 2 deletions query/graphql/planner/explain.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,8 @@ var (
_ explainablePlanNode = (*dagScanNode)(nil)
_ explainablePlanNode = (*deleteNode)(nil)
_ explainablePlanNode = (*groupNode)(nil)
_ explainablePlanNode = (*hardLimitNode)(nil)
_ explainablePlanNode = (*limitNode)(nil)
_ explainablePlanNode = (*orderNode)(nil)
_ explainablePlanNode = (*renderLimitNode)(nil)
_ explainablePlanNode = (*scanNode)(nil)
_ explainablePlanNode = (*selectNode)(nil)
_ explainablePlanNode = (*selectTopNode)(nil)
Expand Down
101 changes: 14 additions & 87 deletions query/graphql/planner/limit.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,11 @@ package planner
import (
"github.com/sourcenetwork/defradb/core"
"github.com/sourcenetwork/defradb/query/graphql/mapper"
parserTypes "github.com/sourcenetwork/defradb/query/graphql/parser/types"
)

// Limit the results, yielding only what the limit/offset permits
// @todo: Handle cursor
type hardLimitNode struct {
type limitNode struct {
docMapper

p *Planner
Expand All @@ -29,12 +28,12 @@ type hardLimitNode struct {
rowIndex int64
}

// HardLimit creates a new hardLimitNode initalized from the parser.Limit object.
func (p *Planner) HardLimit(parsed *mapper.Select, n *mapper.Limit) (*hardLimitNode, error) {
// Limit creates a new limitNode initalized from the parser.Limit object.
func (p *Planner) Limit(parsed *mapper.Select, n *mapper.Limit) (*limitNode, error) {
if n == nil {
return nil, nil // nothing to do
}
return &hardLimitNode{
return &limitNode{
p: p,
limit: n.Limit,
offset: n.Offset,
Expand All @@ -43,21 +42,21 @@ func (p *Planner) HardLimit(parsed *mapper.Select, n *mapper.Limit) (*hardLimitN
}, nil
}

func (n *hardLimitNode) Kind() string {
return "hardLimitNode"
func (n *limitNode) Kind() string {
return "limitNode"
}

func (n *hardLimitNode) Init() error {
func (n *limitNode) Init() error {
n.rowIndex = 0
return n.plan.Init()
}

func (n *hardLimitNode) Start() error { return n.plan.Start() }
func (n *hardLimitNode) Spans(spans core.Spans) { n.plan.Spans(spans) }
func (n *hardLimitNode) Close() error { return n.plan.Close() }
func (n *hardLimitNode) Value() core.Doc { return n.plan.Value() }
func (n *limitNode) Start() error { return n.plan.Start() }
func (n *limitNode) Spans(spans core.Spans) { n.plan.Spans(spans) }
func (n *limitNode) Close() error { return n.plan.Close() }
func (n *limitNode) Value() core.Doc { return n.plan.Value() }

func (n *hardLimitNode) Next() (bool, error) {
func (n *limitNode) Next() (bool, error) {
// check if we're passed the limit
if n.limit != 0 && n.rowIndex-n.offset >= n.limit {
return false, nil
Expand All @@ -79,81 +78,9 @@ func (n *hardLimitNode) Next() (bool, error) {
return true, nil
}

func (n *hardLimitNode) Source() planNode { return n.plan }
func (n *limitNode) Source() planNode { return n.plan }

func (n *hardLimitNode) Explain() (map[string]interface{}, error) {
exp := map[string]interface{}{
limitLabel: n.limit,
offsetLabel: n.offset,
}

if n.limit == 0 {
exp[limitLabel] = nil
}

return exp, nil
}

// limit the results, flagging any records outside the bounds of limit/offset with
// with a 'hidden' flag blocking rendering. Used if consumers of the results require
// the full dataset.
type renderLimitNode struct {
documentIterator
docMapper

p *Planner
plan planNode

limit int64
offset int64
rowIndex int64
}

// RenderLimit creates a new renderLimitNode initalized from
// the parser.Limit object.
func (p *Planner) RenderLimit(docMap *core.DocumentMapping, n *parserTypes.Limit) (*renderLimitNode, error) {
if n == nil {
return nil, nil // nothing to do
}
return &renderLimitNode{
p: p,
limit: n.Limit,
offset: n.Offset,
rowIndex: 0,
docMapper: docMapper{docMap},
}, nil
}

func (n *renderLimitNode) Kind() string {
return "renderLimitNode"
}

func (n *renderLimitNode) Init() error {
n.rowIndex = 0
return n.plan.Init()
}

func (n *renderLimitNode) Start() error { return n.plan.Start() }
func (n *renderLimitNode) Spans(spans core.Spans) { n.plan.Spans(spans) }
func (n *renderLimitNode) Close() error { return n.plan.Close() }

func (n *renderLimitNode) Next() (bool, error) {
if next, err := n.plan.Next(); !next {
return false, err
}

n.currentValue = n.plan.Value()

n.rowIndex++
if (n.limit != 0 && n.rowIndex-n.offset > n.limit) || n.rowIndex <= n.offset {
n.currentValue.Hidden = true
}
return true, nil
}

func (n *renderLimitNode) Source() planNode { return n.plan }

func (n *renderLimitNode) Explain() (map[string]interface{}, error) {
func (n *limitNode) Explain() (map[string]interface{}, error) {
exp := map[string]interface{}{
limitLabel: n.limit,
offsetLabel: n.offset,
Expand Down
3 changes: 1 addition & 2 deletions query/graphql/planner/operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,12 @@ var (
_ planNode = (*dagScanNode)(nil)
_ planNode = (*deleteNode)(nil)
_ planNode = (*groupNode)(nil)
_ planNode = (*hardLimitNode)(nil)
_ planNode = (*limitNode)(nil)
_ planNode = (*headsetScanNode)(nil)
_ planNode = (*multiScanNode)(nil)
_ planNode = (*orderNode)(nil)
_ planNode = (*parallelNode)(nil)
_ planNode = (*pipeNode)(nil)
_ planNode = (*renderLimitNode)(nil)
_ planNode = (*scanNode)(nil)
_ planNode = (*selectNode)(nil)
_ planNode = (*selectTopNode)(nil)
Expand Down
60 changes: 13 additions & 47 deletions query/graphql/planner/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,10 +280,7 @@ func (p *Planner) expandSelectTopNodePlan(plan *selectTopNode, parentPlan *selec
}

if plan.limit != nil {
err := p.expandLimitPlan(plan, parentPlan)
if err != nil {
return err
}
p.expandLimitPlan(plan, parentPlan)
}

return nil
Expand Down Expand Up @@ -372,51 +369,20 @@ func (p *Planner) expandGroupNodePlan(plan *selectTopNode) error {
return nil
}

func (p *Planner) expandLimitPlan(plan *selectTopNode, parentPlan *selectTopNode) error {
switch l := plan.limit.(type) {
case *hardLimitNode:
if l == nil {
return nil
}

// Limits get more complicated with groups and have to be handled internally, so we ensure
// any limit plan is disabled here
if parentPlan != nil && parentPlan.group != nil && len(parentPlan.group.childSelects) != 0 {
plan.limit = nil
return nil
}

// if this is a child node, and the parent select has an aggregate then we need to
// replace the hard limit with a render limit to allow the full set of child records
// to be aggregated
if parentPlan != nil && len(parentPlan.aggregates) > 0 {
renderLimit, err := p.RenderLimit(
parentPlan.documentMapping,
&parserTypes.Limit{
Offset: l.offset,
Limit: l.limit,
},
)
if err != nil {
return err
}
plan.limit = renderLimit

renderLimit.plan = plan.plan
plan.plan = plan.limit
} else {
l.plan = plan.plan
plan.plan = plan.limit
}
case *renderLimitNode:
if l == nil {
return nil
}
func (p *Planner) expandLimitPlan(plan *selectTopNode, parentPlan *selectTopNode) {
if plan.limit == nil {
return
}

l.plan = plan.plan
plan.plan = plan.limit
// Limits get more complicated with groups and have to be handled internally, so we ensure
// any limit plan is disabled here
if parentPlan != nil && parentPlan.group != nil && len(parentPlan.group.childSelects) != 0 {
plan.limit = nil
return
}
return nil

plan.limit.plan = plan.plan
plan.plan = plan.limit
}

// walkAndReplace walks through the provided plan, and searches for an instance
Expand Down
6 changes: 3 additions & 3 deletions query/graphql/planner/select.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type selectTopNode struct {

group *groupNode
order *orderNode
limit planNode
limit *limitNode
aggregates []aggregateNode

// selectnode is used pre-wiring of the plan (before expansion and all).
Expand Down Expand Up @@ -387,7 +387,7 @@ func (p *Planner) SelectFromSource(
return nil, err
}

limitPlan, err := p.HardLimit(parsed, limit)
limitPlan, err := p.Limit(parsed, limit)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -430,7 +430,7 @@ func (p *Planner) Select(parsed *mapper.Select) (planNode, error) {
return nil, err
}

limitPlan, err := p.HardLimit(parsed, limit)
limitPlan, err := p.Limit(parsed, limit)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions tests/integration/query/explain/group_with_limit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func TestExplainGroupByWithGroupLimitAndOffsetOnParentGroupBy(t *testing.T) {
{
"explain": dataMap{
"selectTopNode": dataMap{
"hardLimitNode": dataMap{
"limitNode": dataMap{
"limit": int64(1),
"offset": int64(1),
"groupNode": dataMap{
Expand Down Expand Up @@ -367,7 +367,7 @@ func TestExplainGroupByWithGroupLimitOnParentAndChild(t *testing.T) {
{
"explain": dataMap{
"selectTopNode": dataMap{
"hardLimitNode": dataMap{
"limitNode": dataMap{
"limit": int64(1),
"offset": int64(0),
"groupNode": dataMap{
Expand Down
Loading