Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 124 additions & 0 deletions src/Core/Incremental.fs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,130 @@ type IncrementalExtensions =
let prev = this.DelayZSet integrated // z^-1(I(Δa)) = snapshot before this tick
this.DistinctIncremental(prev, delta)

/// **Capability-aware incremental dispatcher.** Picks the right
/// incrementalization for `q` based on the algebra capability tag
/// on the operator `q` produces:
///
/// - `IsLinear = true` → `Q^Δ = Q` (trivial; deltas pass through)
/// - `IsSink = true` → throws (sinks are terminal; can't be
/// incrementalized — see PR 2's
/// sink-terminality enforcement)
/// - otherwise → `D ∘ Q ∘ I` fallback (always correct
/// but expensive — materializes the full
/// integrated relation every tick)
///
/// ## How the dispatch works (and important side-effects)
///
/// `q` is a `Stream → Stream` factory. We **probe** it by invoking
/// `q.Invoke input` *before* deciding the dispatch path; the
/// resulting `Stream<'TOut>.Op` carries the capability tags from
/// PR 1. The probe is then either:
///
/// - **Linear case**: probed output returned directly (correct
/// because linear ops commute with `D`/`I`); no orphan.
/// - **Sink case**: throws — but the sink operator has *already
/// been registered* in the circuit by the probe `q.Invoke`.
/// - **Fallback case**: probed op kept in circuit as **orphan**
/// (no consumers); fallback rebuilds via `IncrementalizeZSet`.
///
/// ## ⚠️ Side-effect warning
///
/// **The probe `q.Invoke input` ALWAYS runs once, regardless of
/// dispatch path.** This has two implications callers must
/// account for:
///
/// 1. **Side-effecting builders execute**: if `q` is a factory
/// like `AdvancedExtensions.Inspect` that performs work at
/// construction time (logging, registering hooks, allocating
/// external resources), that work runs **even when the
/// dispatcher throws or falls back**. Use a non-side-effecting
/// `q` factory, or accept the probe-side-effect cost.
Comment on lines +108 to +117
/// 2. **Circuit is mutated even on the throw path**: when the
/// sink case fires, the sink operator is **already in the
/// circuit** when the exception is raised. The caller's
/// circuit has been modified; catching the exception and
/// continuing leaves an orphan sink that will still be
/// scheduled and ticked. Either don't catch + recover from
/// `IncrementalAuto` exceptions on a circuit you'll continue
/// to use, or accept that the sink op stays registered.
///
/// A non-registering probe path would close both gaps but requires
/// architectural changes to the `Op` / `Circuit` contract
/// (registration rollback isn't currently supported). Tracked as
/// future work; the dispatcher's correctness on the happy paths
/// (linear + fallback) doesn't depend on it.
///
/// ## Known cost: orphan operator in the non-linear fallback path
///
/// When the fallback fires, the probed operator stays registered
/// in the circuit with no consumers — the scheduler will still tick
/// it every cycle (wasted work), but it produces no observable
/// output. Pruning unreachable operators at `Circuit.Build()` time
/// is a future improvement; the dispatcher's correctness doesn't
/// depend on it.
///
/// ## When NOT to use this
///
/// For bilinear joins, use `IncrementalJoin` directly — it has a
/// richer signature (key functions, combine function) that this
/// unary dispatcher can't express. A `IncrementalAutoJoin`
/// dispatcher for bilinear ops can be added in a later PR.
///
/// ## Reads
///
/// `Op.IsLinear` and `Op.IsSink` (PR 1). For internal operators
/// these are overridden in the concrete subclasses; for plugin
/// operators they're surfaced via the `PluginOperatorAdapter`
/// marker detection.
[<Extension>]
static member IncrementalAuto<'K when 'K : comparison>
(this: Circuit,
q: Func<Stream<ZSet<'K>>, Stream<ZSet<'K>>>,
input: Stream<ZSet<'K>>) : Stream<ZSet<'K>> =
// Probe: apply q to input directly to inspect the resulting
// operator's capability tag. Registration is a side effect;
// see "orphan operator" note above.
let probedOutput = q.Invoke input
let resultOp = probedOutput.Op
Comment thread
AceHack marked this conversation as resolved.
// Check the WHOLE CHAIN from the probed terminal op back to
// the original input, not just the terminal op's IsLinear
// tag. A query like `q(s) = Map(Distinct(s))` has a terminal
// Map (IsLinear=true) but a non-linear Distinct inside; the
// composed query is NOT linear and the Q^Δ = Q rewrite would
// produce wrong incremental results. Walk Inputs back to
// `input.Op`: if every op in the chain is linear AND we reach
// `input.Op` only through linear ops, the chain is linear.
// Multi-input ops (Plus, Minus — currently default
// IsLinear=false) correctly fall back via this check.
let rec isLinearChainToInput (op: Op) (inputOp: Op) : bool =
if System.Object.ReferenceEquals(op, inputOp) then true
elif op.Inputs.Length = 0 then false // source op that isn't the input
elif not op.IsLinear then false
else op.Inputs |> Array.forall (fun dep -> isLinearChainToInput dep inputOp)
Comment on lines +175 to +179
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Avoid recursive linearity walk on deep operator chains

IncrementalAuto computes linearity with a recursive DFS over Op.Inputs; for long generated pipelines (e.g., thousands of chained linear operators) this can overflow the .NET call stack during dispatch, and because there is no visited/memoized traversal it also repeats work across shared subgraphs. This turns capability detection into a construction-time crash/perf hazard on large circuits before any tick runs.

Useful? React with 👍 / 👎.

Comment on lines +175 to +179
if resultOp.IsSink then
invalidOp
(sprintf
"IncrementalAuto: cannot incrementalize a sink operator. \
'%s' (id=%d) declared IsSink=true; sinks are terminal \
and excluded from relational composition. Note: the \
probe already registered '%s' in this circuit before \
this check; the orphan sink will be scheduled and \
ticked unless the circuit is discarded. Use a \
non-sink operator, or consume the sink's output \
directly without incrementalization."
resultOp.Name resultOp.Id resultOp.Name)
elif isLinearChainToInput resultOp (input.Op :> Op) then
// Q^Δ = Q. For *whole-chain* linear q, q(delta) IS the
// delta of q(full). The probed output is correct as-is;
// return directly. Distinction from `resultOp.IsLinear`
// alone: this guards against terminal-linear-but-inner-
// non-linear queries like `Map(Distinct(s))`.
probedOutput
else
// Generic D ∘ Q ∘ I fallback. The probed op above is orphan
// in the circuit — wasteful but functionally correct.
this.IncrementalizeZSet(q, input)


/// F#-idiomatic piping. `stream |> Stream.map f |> Stream.filter p` is
/// equivalent to `circuit.Filter(circuit.Map(stream, f), p)` but reads in
Expand Down
205 changes: 205 additions & 0 deletions tests/Tests.FSharp/Circuit/IncrementalAuto.Tests.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
module Zeta.Tests.Circuit.IncrementalAutoTests

/// Tests for `IncrementalAuto` — the capability-aware incremental
/// dispatcher added in PR 4. The dispatcher consumes the `IsLinear`
/// and `IsSink` capability tags on `Op<'T>` (from PR 1) to pick the
/// right incrementalization at construction time:
///
/// - Linear op → `Q^Δ = Q` (deltas pass through unchanged)
/// - Sink op → throw (terminal; can't incrementalize)
/// - Otherwise → fall back to `D ∘ Q ∘ I`
///
/// The end-to-end correctness check: both `IncrementalAuto(q, input)`
/// and the manually-written reference form (`q(input)` for linear,
/// `D∘Q∘I` for fallback) must produce bit-identical output streams
/// across all ticks. We test by running both forms in parallel
/// inside a single circuit.

open System
open System.Threading.Tasks
open Xunit
open FsUnit.Xunit
open Zeta.Core


// ─────────────────────────────────────────────────────────────────
// Helpers — run a circuit a few ticks and capture the output
// ─────────────────────────────────────────────────────────────────

// ─────────────────────────────────────────────────────────────────
// Linear case — Map. IncrementalAuto should return Q directly.
// ─────────────────────────────────────────────────────────────────

[<Fact>]
let ``IncrementalAuto with linear Map produces same delta stream as direct Q`` () =
let c = Circuit()
let input = c.ZSetInput<int>()
let doubleIt = Func<Stream<ZSet<int>>, Stream<ZSet<int>>>(fun s ->
c.Map(s, Func<int, int>(fun x -> x * 2)))

// Both ops registered in the same circuit; they share the input stream.
let reference = doubleIt.Invoke input.Stream // direct: q(delta) for linear is correct
let subject = c.IncrementalAuto(doubleIt, input.Stream)

let refHandle = c.Output reference
let subHandle = c.Output subject

let deltas =
[ ZSet.ofSeq [ (1, 1L); (2, 1L) ]
ZSet.ofSeq [ (3, 1L) ]
ZSet.ofSeq [ (1, -1L) ]
ZSet.Empty ]

for delta in deltas do
input.Send delta
c.Step()
// Each tick, both should produce the same output.
subHandle.Current |> should equal refHandle.Current


// ─────────────────────────────────────────────────────────────────
// Non-linear case — Distinct. IncrementalAuto should fall back to
// D ∘ Q ∘ I, which equals IncrementalizeZSet's output.
// ─────────────────────────────────────────────────────────────────

[<Fact>]
let ``IncrementalAuto with terminal-linear-but-inner-non-linear chain falls back (Map ∘ Distinct)`` () =
// Regression test for the bug Codex flagged: checking only the
// probed terminal op's IsLinear misses non-linear ops inside the
// chain. `q(s) = Map(Distinct(s))` ends on a linear Map but the
Comment on lines +67 to +69
// composed query is non-linear; the dispatcher MUST fall back to
// D∘Q∘I to produce correct incremental semantics.
let c = Circuit()
let input = c.ZSetInput<int>()
let mapAfterDistinct =
Func<Stream<ZSet<int>>, Stream<ZSet<int>>>(fun s ->
c.Map(c.Distinct s, Func<int, int>(fun x -> x * 10)))

let reference = c.IncrementalizeZSet(mapAfterDistinct, input.Stream)
let subject = c.IncrementalAuto(mapAfterDistinct, input.Stream)

let refHandle = c.Output reference
let subHandle = c.Output subject

// Scenario: duplicate insertions across ticks. If the dispatcher
// incorrectly took the Q^Δ=Q path, the subject would emit the
// Map of the delta directly each tick — wrong because Distinct
// clamps cumulative state, so the second insertion of key 1
// should produce no new output. The reference path (D∘Q∘I)
// computes this correctly; subject must match.
let deltas =
[ ZSet.ofSeq [ (1, 1L) ] // distinct: {1→1}, mapped: {10→1}; emit Δ {10→+1}
ZSet.ofSeq [ (1, 1L) ] // distinct: still {1→1}; mapped same; emit Δ {} (no change)
ZSet.ofSeq [ (2, 1L) ] // distinct: {1,2}; mapped: {10,20}; emit Δ {20→+1}
ZSet.ofSeq [ (1, -2L) ] // distinct: {2}; mapped: {20}; emit Δ {10→-1}
]

for delta in deltas do
input.Send delta
c.Step()
subHandle.Current |> should equal refHandle.Current


[<Fact>]
let ``IncrementalAuto with non-linear Distinct falls back to D-Q-I`` () =
let c = Circuit()
let input = c.ZSetInput<int>()
let distinctOp = Func<Stream<ZSet<int>>, Stream<ZSet<int>>>(fun s -> c.Distinct s)

let reference = c.IncrementalizeZSet(distinctOp, input.Stream)
let subject = c.IncrementalAuto(distinctOp, input.Stream)

let refHandle = c.Output reference
let subHandle = c.Output subject

let deltas =
[ ZSet.ofSeq [ (1, 1L); (2, 1L) ]
ZSet.ofSeq [ (1, 1L) ] // duplicate of 1; distinct should not emit it again
ZSet.ofSeq [ (3, 1L) ]
ZSet.ofSeq [ (1, -1L) ] // retract one instance of 1
ZSet.ofSeq [ (1, -1L) ] // retract the last instance — distinct should drop 1
ZSet.Empty ]

for delta in deltas do
input.Send delta
c.Step()
subHandle.Current |> should equal refHandle.Current


// ─────────────────────────────────────────────────────────────────
// Sink case — should throw
// ─────────────────────────────────────────────────────────────────

/// A sink that outputs a Z-set (so its output type matches the
/// `IncrementalAuto<'K>` signature). Sink-ness is declarative — the
/// dispatcher should reject before stepping.
type private ZSetSinkOp(input: Stream<ZSet<int>>) =
let deps = [| input.AsDependency() |]
interface ISinkOperator<ZSet<int>, ZSet<int>> with
member _.Name = "test-zset-sink"
member _.ReadDependencies = deps
member _.StepAsync(out, _ct) =
out.Publish input.Current
ValueTask.CompletedTask


[<Fact>]
let ``IncrementalAuto throws when the operator is a sink`` () =
let c = Circuit()
let input = c.ZSetInput<int>()
let sinkBuilder =
Func<Stream<ZSet<int>>, Stream<ZSet<int>>>(fun s ->
c.RegisterStream (ZSetSinkOp s :> IOperator<ZSet<int>>))
let ex =
Assert.Throws<InvalidOperationException>(fun () ->
c.IncrementalAuto(sinkBuilder, input.Stream) |> ignore)
ex.Message |> should haveSubstring "IncrementalAuto"
ex.Message |> should haveSubstring "sink"
ex.Message |> should haveSubstring "test-zset-sink"


// ─────────────────────────────────────────────────────────────────
// Structural fast-path verification — the linear case should take
// the passthrough dispatch (no Integrate, no Differentiate), so the
// number of operators registered after `IncrementalAuto` matches
// the count from a direct `q.Invoke` (one operator: just the Map).
//
// This test verifies the dispatch path via operator-count delta —
// the indirect-but-deterministic signal that the fast path was
// taken. A stronger reference-equality assertion (the returned
// Stream's underlying Op IS the probed Op) would require exposing
// the internal `Stream.Op` accessor, which is internal-only today.
// ─────────────────────────────────────────────────────────────────

[<Fact>]
let ``IncrementalAuto with linear op adds exactly one operator (passthrough, no D-I)`` () =
let c = Circuit()
let input = c.ZSetInput<int>()
let countBefore = c.OperatorCount
let _ =
c.IncrementalAuto(
Func<Stream<ZSet<int>>, Stream<ZSet<int>>>(fun s ->
c.Map(s, Func<int, int>(fun x -> x * 2))),
input.Stream)
let countAfter = c.OperatorCount
// Linear path: only the Map op registers. No Integrate, no Differentiate.
(countAfter - countBefore) |> should equal 1


[<Fact>]
let ``IncrementalAuto with non-linear op adds four operators (probe-orphan + Integrate + new Q + Differentiate)`` () =
let c = Circuit()
let input = c.ZSetInput<int>()
let countBefore = c.OperatorCount
let _ =
c.IncrementalAuto(
Func<Stream<ZSet<int>>, Stream<ZSet<int>>>(fun s -> c.Distinct s),
input.Stream)
let countAfter = c.OperatorCount
// Fallback path registers:
// 1. Probe q.Invoke(input) → orphan Distinct
// 2. IntegrateZSet(input)
// 3. q.Invoke(integrated) → second Distinct
// 4. DifferentiateZSet(processed)
// Total: 4 new operators (one is orphan, see IncrementalAuto docstring).
(countAfter - countBefore) |> should equal 4
Comment thread
AceHack marked this conversation as resolved.
1 change: 1 addition & 0 deletions tests/Tests.FSharp/Tests.FSharp.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
<Compile Include="Circuit/Plan.Tests.fs" />
<Compile Include="Circuit/Plan.Branches.Tests.fs" />
<Compile Include="Circuit/SinkTerminality.Tests.fs" />
<Compile Include="Circuit/IncrementalAuto.Tests.fs" />

<!-- Operators/ -->
<Compile Include="Operators/Aggregate.Tests.fs" />
Expand Down
Loading