Skip to content

Commit

Permalink
need to handle errors better
Browse files Browse the repository at this point in the history
  • Loading branch information
wcharczuk committed Feb 12, 2024
1 parent c5d90e0 commit 25d4c54
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 63 deletions.
115 changes: 63 additions & 52 deletions graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,93 +240,75 @@ func (graph *Graph) checkIfUnnecessary(n INode) {
}
}

func (graph *Graph) becameNecessary(node INode) {
graph.initializeNode(node)
_ = graph.adjustHeightsHeap.setHeight(node, heightFromScope(node)+1)
func (graph *Graph) becameNecessary(node INode) (err error) {
if err = graph.addNodeOrObserver(node); err != nil {
return
}
if err = graph.adjustHeightsHeap.setHeight(node, heightFromScope(node)+1); err != nil {
return
}
for _, p := range node.Node().parents {
if p.Node().height >= node.Node().height {
_ = graph.adjustHeightsHeap.setHeight(node, p.Node().height+1)
if err = graph.adjustHeightsHeap.setHeight(node, p.Node().height+1); err != nil {
return
}
}
if err = graph.becameNecessary(p); err != nil {
return
}
graph.becameNecessary(p)
}

if node.Node().ShouldRecompute() {
graph.recomputeHeap.add(node)
}
return
}

func (graph *Graph) isNecessary(n INode) bool {
nn := n.Node()
if _, isObserver := n.(IObserver); isObserver {
if nn.observer {
return true
}
return len(nn.children) > 0 || len(nn.observers) > 0
}

func (graph *Graph) initializeNode(gn INode) {
gnn := gn.Node()
gnn.graph = graph
graphAlreadyHasNode := graph.maybeAddNodeToGraph(gn)
if graphAlreadyHasNode {
return
func (graph *Graph) addNodeOrObserver(gn INode) error {
typedObserver, isObserver := gn.(IObserver)
if isObserver {
return graph.addObserver(typedObserver)
}
graph.numNodes++
gnn.detectCutoff(gn)
gnn.detectAlways(gn)
gnn.detectStabilize(gn)
return graph.addNode(gn)
}

func (graph *Graph) maybeAddNodeToGraph(gn INode) (ok bool) {
func (graph *Graph) addNode(n INode) error {
graph.nodesMu.Lock()
defer graph.nodesMu.Unlock()
if _, ok = graph.nodes[gn.Node().id]; ok {
return
}
graph.nodes[gn.Node().id] = gn
return
}

func (graph *Graph) removeNodeFromGraph(gn INode) {
graph.recomputeHeap.remove(gn)
graph.adjustHeightsHeap.remove(gn)

graph.nodesMu.Lock()
delete(graph.nodes, gn.Node().id)
graph.nodesMu.Unlock()

graph.numNodes--

gnn := gn.Node()

graph.handleAfterStabilizationMu.Lock()
delete(graph.handleAfterStabilization, gnn.ID())
graph.handleAfterStabilizationMu.Unlock()

gnn.setAt = 0
gnn.boundAt = 0
gnn.recomputedAt = 0

// NOTE (wc): we never _really_ can remove the createdIn reference because
// we don't track construction of nodes carefully.
// gnn.createdIn = nil
gnn.graph = nil
gnn.height = 0
gnn.heightInRecomputeHeap = 0
gnn.heightInAdjustHeightsHeap = 0
gnn := n.Node()
_, graphAlreadyHasNode := graph.nodes[gnn.id]
if graphAlreadyHasNode {
return nil
}
gnn.graph = graph
graph.numNodes++
gnn.detectAlways(n)
gnn.detectCutoff(n)
gnn.detectStabilize(n)
graph.nodes[gnn.id] = n
return nil
}

func (graph *Graph) addObserver(on IObserver) error {
onn := on.Node()
onn.graph = graph

graph.observersMu.Lock()
if _, ok := graph.observers[onn.id]; !ok {
graph.numNodes++
graph.observers[onn.id] = on
}
graph.observersMu.Unlock()
onn.detectStabilize(on)

onn.detectObserver(on)
if err := graph.adjustHeights(on); err != nil {
return err
}
Expand Down Expand Up @@ -358,6 +340,35 @@ func (graph *Graph) removeObserver(on IObserver) {
onn.recomputedAt = 0
}

func (graph *Graph) removeNodeFromGraph(gn INode) {
graph.recomputeHeap.remove(gn)
graph.adjustHeightsHeap.remove(gn)

graph.nodesMu.Lock()
delete(graph.nodes, gn.Node().id)
graph.nodesMu.Unlock()

graph.numNodes--

gnn := gn.Node()

graph.handleAfterStabilizationMu.Lock()
delete(graph.handleAfterStabilization, gnn.ID())
graph.handleAfterStabilizationMu.Unlock()

gnn.setAt = 0
gnn.boundAt = 0
gnn.recomputedAt = 0

// NOTE (wc): we never _really_ can remove the createdIn reference because
// we don't track construction of nodes carefully.
// gnn.createdIn = nil
gnn.graph = nil
gnn.height = 0
gnn.heightInRecomputeHeap = 0
gnn.heightInAdjustHeightsHeap = 0
}

func (graph *Graph) adjustHeights(node INode) error {
_ = graph.adjustHeightsHeap.setHeight(node, heightFromScope(node)+1)
for _, p := range node.Node().parents {
Expand Down
11 changes: 8 additions & 3 deletions link.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,24 @@ package incr
//
// An error is returned if the provided inputs to the child node
// would produce a cycle.
func Link(child INode, parents ...INode) {
func Link(child INode, parents ...INode) error {
graph := graphFromScope(child)
wasNecessary := graph.isNecessary(child)
child.Node().addParents(parents...)
for _, parent := range parents {
parent.Node().addChildren(child)
}
if !wasNecessary {
graph.becameNecessary(child)
if err := graph.becameNecessary(child); err != nil {
return err
}
}
for _, parent := range parents {
_ = graph.adjustHeightsHeap.adjustHeights(graph.recomputeHeap, child, parent)
if err := graph.adjustHeightsHeap.adjustHeights(graph.recomputeHeap, child, parent); err != nil {
return err
}
}
return nil
}

func graphFromScope(n INode) *Graph {
Expand Down
8 changes: 8 additions & 0 deletions node.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ type Node struct {
// cutoff is set during initialization and is a shortcut
// to the interface sniff for the node for the ICutoff interface.
cutoff func(context.Context) (bool, error)
// observer determines if this node is an observer.
observer bool
// always determines if we always recompute this node.
always bool
// numRecomputes is the number of times we recomputed the node
Expand Down Expand Up @@ -295,6 +297,12 @@ func (n *Node) detectAlways(gn INode) {
_, n.always = gn.(IAlways)
}

// detectObserver detects if a INode (which should be the same
// as as managed by this node reference), implements IObserver.
func (n *Node) detectObserver(gn INode) {
_, n.observer = gn.(IObserver)
}

// detectStabilize detects if a INode (which should be the same
// as as managed by this node reference), implements IStabilize
// and grabs a reference to the Stabilize delegate function.
Expand Down
11 changes: 3 additions & 8 deletions observe.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,10 @@ func Observe[A any](g *Graph, input Incr[A]) ObserveIncr[A] {
input: input,
})
Link(o, input)
// NOTE(wc): we do this here because some """expert""" use cases for `ExpertGraph::AddObserver`
// require us to add the observer to the graph observer list but _not_
// add it to the recompute heap.
//
// So we just add it here explicitly and don't add it implicitly
// in the AddObserver function.
_ = g.addObserver(o)
input.Node().addObservers(o)
g.becameNecessary(input)
if err := g.becameNecessary(input); err != nil {
panic(err)
}
g.recomputeHeap.add(o)
return o
}
Expand Down

0 comments on commit 25d4c54

Please sign in to comment.