diff --git a/src/Core/Incremental.fs b/src/Core/Incremental.fs index 89fb127e79..f653d56363 100644 --- a/src/Core/Incremental.fs +++ b/src/Core/Incremental.fs @@ -77,6 +77,130 @@ type IncrementalExtensions = let prev = this.DelayZSet integrated // z^-1(I(Δa)) = snapshot before this tick this.DistinctIncremental(prev, delta) + /// **Capability-aware incremental dispatcher.** Picks the right + /// incrementalization for `q` based on the algebra capability tag + /// on the operator `q` produces: + /// + /// - `IsLinear = true` → `Q^Δ = Q` (trivial; deltas pass through) + /// - `IsSink = true` → throws (sinks are terminal; can't be + /// incrementalized — see PR 2's + /// sink-terminality enforcement) + /// - otherwise → `D ∘ Q ∘ I` fallback (always correct + /// but expensive — materializes the full + /// integrated relation every tick) + /// + /// ## How the dispatch works (and important side-effects) + /// + /// `q` is a `Stream → Stream` factory. We **probe** it by invoking + /// `q.Invoke input` *before* deciding the dispatch path; the + /// resulting `Stream<'TOut>.Op` carries the capability tags from + /// PR 1. The probe is then either: + /// + /// - **Linear case**: probed output returned directly (correct + /// because linear ops commute with `D`/`I`); no orphan. + /// - **Sink case**: throws — but the sink operator has *already + /// been registered* in the circuit by the probe `q.Invoke`. + /// - **Fallback case**: probed op kept in circuit as **orphan** + /// (no consumers); fallback rebuilds via `IncrementalizeZSet`. + /// + /// ## ⚠️ Side-effect warning + /// + /// **The probe `q.Invoke input` ALWAYS runs once, regardless of + /// dispatch path.** This has two implications callers must + /// account for: + /// + /// 1. **Side-effecting builders execute**: if `q` is a factory + /// like `AdvancedExtensions.Inspect` that performs work at + /// construction time (logging, registering hooks, allocating + /// external resources), that work runs **even when the + /// dispatcher throws or falls back**. Use a non-side-effecting + /// `q` factory, or accept the probe-side-effect cost. + /// 2. **Circuit is mutated even on the throw path**: when the + /// sink case fires, the sink operator is **already in the + /// circuit** when the exception is raised. The caller's + /// circuit has been modified; catching the exception and + /// continuing leaves an orphan sink that will still be + /// scheduled and ticked. Either don't catch + recover from + /// `IncrementalAuto` exceptions on a circuit you'll continue + /// to use, or accept that the sink op stays registered. + /// + /// A non-registering probe path would close both gaps but requires + /// architectural changes to the `Op` / `Circuit` contract + /// (registration rollback isn't currently supported). Tracked as + /// future work; the dispatcher's correctness on the happy paths + /// (linear + fallback) doesn't depend on it. + /// + /// ## Known cost: orphan operator in the non-linear fallback path + /// + /// When the fallback fires, the probed operator stays registered + /// in the circuit with no consumers — the scheduler will still tick + /// it every cycle (wasted work), but it produces no observable + /// output. Pruning unreachable operators at `Circuit.Build()` time + /// is a future improvement; the dispatcher's correctness doesn't + /// depend on it. + /// + /// ## When NOT to use this + /// + /// For bilinear joins, use `IncrementalJoin` directly — it has a + /// richer signature (key functions, combine function) that this + /// unary dispatcher can't express. A `IncrementalAutoJoin` + /// dispatcher for bilinear ops can be added in a later PR. + /// + /// ## Reads + /// + /// `Op.IsLinear` and `Op.IsSink` (PR 1). For internal operators + /// these are overridden in the concrete subclasses; for plugin + /// operators they're surfaced via the `PluginOperatorAdapter` + /// marker detection. + [] + static member IncrementalAuto<'K when 'K : comparison> + (this: Circuit, + q: Func>, Stream>>, + input: Stream>) : Stream> = + // Probe: apply q to input directly to inspect the resulting + // operator's capability tag. Registration is a side effect; + // see "orphan operator" note above. + let probedOutput = q.Invoke input + let resultOp = probedOutput.Op + // Check the WHOLE CHAIN from the probed terminal op back to + // the original input, not just the terminal op's IsLinear + // tag. A query like `q(s) = Map(Distinct(s))` has a terminal + // Map (IsLinear=true) but a non-linear Distinct inside; the + // composed query is NOT linear and the Q^Δ = Q rewrite would + // produce wrong incremental results. Walk Inputs back to + // `input.Op`: if every op in the chain is linear AND we reach + // `input.Op` only through linear ops, the chain is linear. + // Multi-input ops (Plus, Minus — currently default + // IsLinear=false) correctly fall back via this check. + let rec isLinearChainToInput (op: Op) (inputOp: Op) : bool = + if System.Object.ReferenceEquals(op, inputOp) then true + elif op.Inputs.Length = 0 then false // source op that isn't the input + elif not op.IsLinear then false + else op.Inputs |> Array.forall (fun dep -> isLinearChainToInput dep inputOp) + if resultOp.IsSink then + invalidOp + (sprintf + "IncrementalAuto: cannot incrementalize a sink operator. \ + '%s' (id=%d) declared IsSink=true; sinks are terminal \ + and excluded from relational composition. Note: the \ + probe already registered '%s' in this circuit before \ + this check; the orphan sink will be scheduled and \ + ticked unless the circuit is discarded. Use a \ + non-sink operator, or consume the sink's output \ + directly without incrementalization." + resultOp.Name resultOp.Id resultOp.Name) + elif isLinearChainToInput resultOp (input.Op :> Op) then + // Q^Δ = Q. For *whole-chain* linear q, q(delta) IS the + // delta of q(full). The probed output is correct as-is; + // return directly. Distinction from `resultOp.IsLinear` + // alone: this guards against terminal-linear-but-inner- + // non-linear queries like `Map(Distinct(s))`. + probedOutput + else + // Generic D ∘ Q ∘ I fallback. The probed op above is orphan + // in the circuit — wasteful but functionally correct. + this.IncrementalizeZSet(q, input) + /// F#-idiomatic piping. `stream |> Stream.map f |> Stream.filter p` is /// equivalent to `circuit.Filter(circuit.Map(stream, f), p)` but reads in diff --git a/tests/Tests.FSharp/Circuit/IncrementalAuto.Tests.fs b/tests/Tests.FSharp/Circuit/IncrementalAuto.Tests.fs new file mode 100644 index 0000000000..7b937e115d --- /dev/null +++ b/tests/Tests.FSharp/Circuit/IncrementalAuto.Tests.fs @@ -0,0 +1,205 @@ +module Zeta.Tests.Circuit.IncrementalAutoTests + +/// Tests for `IncrementalAuto` — the capability-aware incremental +/// dispatcher added in PR 4. The dispatcher consumes the `IsLinear` +/// and `IsSink` capability tags on `Op<'T>` (from PR 1) to pick the +/// right incrementalization at construction time: +/// +/// - Linear op → `Q^Δ = Q` (deltas pass through unchanged) +/// - Sink op → throw (terminal; can't incrementalize) +/// - Otherwise → fall back to `D ∘ Q ∘ I` +/// +/// The end-to-end correctness check: both `IncrementalAuto(q, input)` +/// and the manually-written reference form (`q(input)` for linear, +/// `D∘Q∘I` for fallback) must produce bit-identical output streams +/// across all ticks. We test by running both forms in parallel +/// inside a single circuit. + +open System +open System.Threading.Tasks +open Xunit +open FsUnit.Xunit +open Zeta.Core + + +// ───────────────────────────────────────────────────────────────── +// Helpers — run a circuit a few ticks and capture the output +// ───────────────────────────────────────────────────────────────── + +// ───────────────────────────────────────────────────────────────── +// Linear case — Map. IncrementalAuto should return Q directly. +// ───────────────────────────────────────────────────────────────── + +[] +let ``IncrementalAuto with linear Map produces same delta stream as direct Q`` () = + let c = Circuit() + let input = c.ZSetInput() + let doubleIt = Func>, Stream>>(fun s -> + c.Map(s, Func(fun x -> x * 2))) + + // Both ops registered in the same circuit; they share the input stream. + let reference = doubleIt.Invoke input.Stream // direct: q(delta) for linear is correct + let subject = c.IncrementalAuto(doubleIt, input.Stream) + + let refHandle = c.Output reference + let subHandle = c.Output subject + + let deltas = + [ ZSet.ofSeq [ (1, 1L); (2, 1L) ] + ZSet.ofSeq [ (3, 1L) ] + ZSet.ofSeq [ (1, -1L) ] + ZSet.Empty ] + + for delta in deltas do + input.Send delta + c.Step() + // Each tick, both should produce the same output. + subHandle.Current |> should equal refHandle.Current + + +// ───────────────────────────────────────────────────────────────── +// Non-linear case — Distinct. IncrementalAuto should fall back to +// D ∘ Q ∘ I, which equals IncrementalizeZSet's output. +// ───────────────────────────────────────────────────────────────── + +[] +let ``IncrementalAuto with terminal-linear-but-inner-non-linear chain falls back (Map ∘ Distinct)`` () = + // Regression test for the bug Codex flagged: checking only the + // probed terminal op's IsLinear misses non-linear ops inside the + // chain. `q(s) = Map(Distinct(s))` ends on a linear Map but the + // composed query is non-linear; the dispatcher MUST fall back to + // D∘Q∘I to produce correct incremental semantics. + let c = Circuit() + let input = c.ZSetInput() + let mapAfterDistinct = + Func>, Stream>>(fun s -> + c.Map(c.Distinct s, Func(fun x -> x * 10))) + + let reference = c.IncrementalizeZSet(mapAfterDistinct, input.Stream) + let subject = c.IncrementalAuto(mapAfterDistinct, input.Stream) + + let refHandle = c.Output reference + let subHandle = c.Output subject + + // Scenario: duplicate insertions across ticks. If the dispatcher + // incorrectly took the Q^Δ=Q path, the subject would emit the + // Map of the delta directly each tick — wrong because Distinct + // clamps cumulative state, so the second insertion of key 1 + // should produce no new output. The reference path (D∘Q∘I) + // computes this correctly; subject must match. + let deltas = + [ ZSet.ofSeq [ (1, 1L) ] // distinct: {1→1}, mapped: {10→1}; emit Δ {10→+1} + ZSet.ofSeq [ (1, 1L) ] // distinct: still {1→1}; mapped same; emit Δ {} (no change) + ZSet.ofSeq [ (2, 1L) ] // distinct: {1,2}; mapped: {10,20}; emit Δ {20→+1} + ZSet.ofSeq [ (1, -2L) ] // distinct: {2}; mapped: {20}; emit Δ {10→-1} + ] + + for delta in deltas do + input.Send delta + c.Step() + subHandle.Current |> should equal refHandle.Current + + +[] +let ``IncrementalAuto with non-linear Distinct falls back to D-Q-I`` () = + let c = Circuit() + let input = c.ZSetInput() + let distinctOp = Func>, Stream>>(fun s -> c.Distinct s) + + let reference = c.IncrementalizeZSet(distinctOp, input.Stream) + let subject = c.IncrementalAuto(distinctOp, input.Stream) + + let refHandle = c.Output reference + let subHandle = c.Output subject + + let deltas = + [ ZSet.ofSeq [ (1, 1L); (2, 1L) ] + ZSet.ofSeq [ (1, 1L) ] // duplicate of 1; distinct should not emit it again + ZSet.ofSeq [ (3, 1L) ] + ZSet.ofSeq [ (1, -1L) ] // retract one instance of 1 + ZSet.ofSeq [ (1, -1L) ] // retract the last instance — distinct should drop 1 + ZSet.Empty ] + + for delta in deltas do + input.Send delta + c.Step() + subHandle.Current |> should equal refHandle.Current + + +// ───────────────────────────────────────────────────────────────── +// Sink case — should throw +// ───────────────────────────────────────────────────────────────── + +/// A sink that outputs a Z-set (so its output type matches the +/// `IncrementalAuto<'K>` signature). Sink-ness is declarative — the +/// dispatcher should reject before stepping. +type private ZSetSinkOp(input: Stream>) = + let deps = [| input.AsDependency() |] + interface ISinkOperator, ZSet> with + member _.Name = "test-zset-sink" + member _.ReadDependencies = deps + member _.StepAsync(out, _ct) = + out.Publish input.Current + ValueTask.CompletedTask + + +[] +let ``IncrementalAuto throws when the operator is a sink`` () = + let c = Circuit() + let input = c.ZSetInput() + let sinkBuilder = + Func>, Stream>>(fun s -> + c.RegisterStream (ZSetSinkOp s :> IOperator>)) + let ex = + Assert.Throws(fun () -> + c.IncrementalAuto(sinkBuilder, input.Stream) |> ignore) + ex.Message |> should haveSubstring "IncrementalAuto" + ex.Message |> should haveSubstring "sink" + ex.Message |> should haveSubstring "test-zset-sink" + + +// ───────────────────────────────────────────────────────────────── +// Structural fast-path verification — the linear case should take +// the passthrough dispatch (no Integrate, no Differentiate), so the +// number of operators registered after `IncrementalAuto` matches +// the count from a direct `q.Invoke` (one operator: just the Map). +// +// This test verifies the dispatch path via operator-count delta — +// the indirect-but-deterministic signal that the fast path was +// taken. A stronger reference-equality assertion (the returned +// Stream's underlying Op IS the probed Op) would require exposing +// the internal `Stream.Op` accessor, which is internal-only today. +// ───────────────────────────────────────────────────────────────── + +[] +let ``IncrementalAuto with linear op adds exactly one operator (passthrough, no D-I)`` () = + let c = Circuit() + let input = c.ZSetInput() + let countBefore = c.OperatorCount + let _ = + c.IncrementalAuto( + Func>, Stream>>(fun s -> + c.Map(s, Func(fun x -> x * 2))), + input.Stream) + let countAfter = c.OperatorCount + // Linear path: only the Map op registers. No Integrate, no Differentiate. + (countAfter - countBefore) |> should equal 1 + + +[] +let ``IncrementalAuto with non-linear op adds four operators (probe-orphan + Integrate + new Q + Differentiate)`` () = + let c = Circuit() + let input = c.ZSetInput() + let countBefore = c.OperatorCount + let _ = + c.IncrementalAuto( + Func>, Stream>>(fun s -> c.Distinct s), + input.Stream) + let countAfter = c.OperatorCount + // Fallback path registers: + // 1. Probe q.Invoke(input) → orphan Distinct + // 2. IntegrateZSet(input) + // 3. q.Invoke(integrated) → second Distinct + // 4. DifferentiateZSet(processed) + // Total: 4 new operators (one is orphan, see IncrementalAuto docstring). + (countAfter - countBefore) |> should equal 4 diff --git a/tests/Tests.FSharp/Tests.FSharp.fsproj b/tests/Tests.FSharp/Tests.FSharp.fsproj index 1a8fc8ad69..769209a75c 100644 --- a/tests/Tests.FSharp/Tests.FSharp.fsproj +++ b/tests/Tests.FSharp/Tests.FSharp.fsproj @@ -37,6 +37,7 @@ +