From 274455aa2df805681584edc828170dcd33092059 Mon Sep 17 00:00:00 2001 From: Aaron Stainback Date: Thu, 21 May 2026 13:21:13 -0400 Subject: [PATCH 1/2] feat(core): promote algebra capability tags from plugin-marker interfaces onto Op<'T> base class MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PR 1 of an 8-PR campaign that wires the algebra-capability system from declarative-but-unenforced markers into a load-bearing, uniformly-detected property surface on every operator (internal + plugin). ## What changes `Op` base class (Circuit.fs) gains four abstract properties — `IsLinear`, `IsBilinear`, `IsSink`, `IsStatefulStrict` — each defaulting to `false`. Concrete operators override only the capabilities they actually have. Until this change, the algebra tags lived ONLY as plugin marker interfaces in PluginApi.fs and were ignored by `PluginOperatorAdapter` (which detected `IStrictOperator`/`IAsyncOperator`/`INestedFixpointParticipant` but not the algebra markers). That asymmetry meant: - Internal operators (MapZSetOp, JoinZSetOp, etc.) had no capability surface at all — algebra was implicit-by-code-shape. - Plugin operators declared capabilities via marker interfaces but `PluginOperatorAdapter` discarded the declarations. - Consumers (Incremental.IncrementalJoin, future Fusion/IncrementalAuto) had no uniform way to ask "is this operator linear?" without custom type tests per call site. ## Non-generic marker pattern F# generic-interface tests require exact type-parameter match — `(box plugin) :? IBilinearOperator` against a concrete `IBilinearOperator` returns false. The fix is the BCL `IEnumerable` / `IEnumerable` pattern: a non-generic marker interface (`ILinearMarker`, `IBilinearMarker`, `ISinkMarker`, `IStatefulStrictMarker`) for runtime `:?` tests, and the typed interface inheriting the marker. Plugin authors continue implementing the typed interface; the marker is satisfied automatically via interface inheritance. `PluginOperatorAdapter` now caches one `:?` check per marker at construction (zero per-tick cost) and surfaces the results through the new `Op` overrides. ## Internal-operator overrides | Operator | Capability | Reasoning | |---|---|---| | MapZSetOp, FilterZSetOp, FlatMapZSetOp, NegZSetOp | IsLinear=true | Z-set algebra: distributes over addition, op(0)=0 | | IndexWithOp | IsLinear=true | Indexing distributes over per-key value-group sum | | JoinZSetOp, CartesianZSetOp, IndexedJoinOp | IsBilinear=true | Weights multiply; per-arg linear; op(0,b)=op(a,0)=0 | | DelayOp, IntegrateOp, DifferentiateOp | IsLinear=true | Time-shift / running-sum / difference commute with group | | FilterMapOp, FilterMapOptionalOp | IsLinear=true | Composition of linear ops | | PlusZSetOp, MinusZSetOp | (default false) | Additive but NOT unary-linear: Plus(0,b)=b≠0 | | DistinctZSetOp, DistinctIncrementalOp | (default false) | Clamps weights — breaks linearity | | GroupBySumOp | (default false) | Output keys depend on summed weights, breaks linearity | | ConstantOp | (default false) | Affine; const_c(0)=c≠0 unless c=0 | ## Tests 21 new tests in `tests/Tests.FSharp/Plugin/Capabilities.Tests.fs`: - 15 internal-operator capability tests (one per named op) - 5 plugin-marker-detection tests via PluginOperatorAdapter - 1 negative test: plain IOperator plugin reports all caps false All 31 plugin tests pass (10 pre-existing + 21 new); 480 / 481 broader operator/algebra/circuit tests pass (1 SKIP is pre-existing). Build clean: 0 warnings, 0 errors on full solution Release build. ## Foundation for PRs 2-8 This is the load-bearing dependency for: - PR 2: Circuit.Build() consults IsSink for terminal-placement enforcement (the docstring promise that's currently vapor). - PR 4: IncrementalAuto dispatcher reads IsLinear/IsBilinear to pick Q^Δ=Q vs three-term-bilinear vs D∘Q∘I fallback. - PR 5: FusionEngine composes capability tags through DAG rewrite. - PRs 6-8: push/morsel/codegen architectures all need uniform capability surfacing to dispatch correctly. No public-API breakage: the marker interfaces still work the same way for plugin authors; the new Op-base-class properties are purely additive. --- src/Core/Circuit.fs | 46 +++ src/Core/Fusion.fs | 7 + src/Core/Operators.fs | 25 ++ src/Core/PluginApi.fs | 62 ++- src/Core/Primitive.fs | 14 + .../Tests.FSharp/Plugin/Capabilities.Tests.fs | 389 ++++++++++++++++++ tests/Tests.FSharp/Tests.FSharp.fsproj | 1 + 7 files changed, 539 insertions(+), 5 deletions(-) create mode 100644 tests/Tests.FSharp/Plugin/Capabilities.Tests.fs diff --git a/src/Core/Circuit.fs b/src/Core/Circuit.fs index 391c45e4a9..d72fda5296 100644 --- a/src/Core/Circuit.fs +++ b/src/Core/Circuit.fs @@ -42,6 +42,52 @@ type Op() = abstract IsAsync: bool default _.IsAsync = false + // ───────────────────────────────────────────────────────────────── + // Algebra capability tags. Promoted from plugin-only marker + // interfaces (PluginApi.fs) to first-class fields on the Op base + // class so internal operators and plugin operators declare + // capabilities through the same surface. The scheduler, fusion + // engine, and incremental-rewriter dispatcher all consult these + // fields — they're load-bearing for capability-aware optimization, + // not decorative. + // + // Default false; each concrete operator overrides only the + // capabilities it actually has. Lying about a tag is an algebraic + // contract violation — LawRunner can verify each in test mode. + // ───────────────────────────────────────────────────────────────── + + /// Algebra capability: operator is *linear* — `op(a + b) = op(a) + + /// op(b)` and `op(0) = 0`. Retraction-native: a negative-weight + /// input un-accumulates correctly. `IncrementalAuto` uses this to + /// emit `Q^Δ = Q` (linear operators incrementalize trivially). + abstract IsLinear: bool + default _.IsLinear = false + + /// Algebra capability: operator is *bilinear* in its two inputs. + /// `op(a₁+a₂, b) = op(a₁, b) + op(a₂, b)` and symmetrically for the + /// second argument; additionally `op(0, b) = op(a, 0) = 0`. + /// `IncrementalAuto` uses this to emit the three-term incremental + /// form `Δa ⋈ Δb + z⁻¹(I(a)) ⋈ Δb + Δa ⋈ z⁻¹(I(b))`. + abstract IsBilinear: bool + default _.IsBilinear = false + + /// Algebra capability: operator is a *sink* — terminal, + /// retraction-lossy, may emit a non-Z-set output. Sinks are + /// excluded from relational composition: `Circuit.Build()` rejects + /// any operator that reads from a sink's output stream (terminal- + /// placement enforcement). Bayesian aggregates and external-system + /// sinks are canonical examples. + abstract IsSink: bool + default _.IsSink = false + + /// Algebra capability: operator carries explicit stateful-strict + /// semantics — init/step/retract triple — distinct from `IsStrict` + /// (feedback-cut). Stateful-strict operators hold per-key state + /// that must retract cleanly when a negative-weight input arrives. + /// `LawRunner.checkRetractionCompleteness` verifies the claim. + abstract IsStatefulStrict: bool + default _.IsStatefulStrict = false + /// An operator with a typed output slot. /// diff --git a/src/Core/Fusion.fs b/src/Core/Fusion.fs index e2c8aa9594..899f09d8ac 100644 --- a/src/Core/Fusion.fs +++ b/src/Core/Fusion.fs @@ -17,6 +17,10 @@ type internal FilterMapOp<'A, 'B when 'A : comparison and 'B : comparison> let inputs = [| input :> Op |] override _.Name = "filterMap" override _.Inputs = inputs + /// Linear: filter ∘ map is the composition of two linear ops — + /// distributes over Z-set addition. The fused implementation + /// preserves linearity by construction. + override _.IsLinear = true override this.StepAsync(_: CancellationToken) = let span = input.Value.AsSpan() if span.IsEmpty then @@ -50,6 +54,9 @@ type internal FilterMapOptionalOp<'A, 'B when 'A : comparison and 'B : compariso let inputs = [| input :> Op |] override _.Name = "filterMap" override _.Inputs = inputs + /// Linear: same reasoning as FilterMapOp — the choose-shaped + /// `pickMap` is equivalent to a filter+map composition. + override _.IsLinear = true override this.StepAsync(_: CancellationToken) = let span = input.Value.AsSpan() if span.IsEmpty then diff --git a/src/Core/Operators.fs b/src/Core/Operators.fs index d218eb1c61..a5628282fd 100644 --- a/src/Core/Operators.fs +++ b/src/Core/Operators.fs @@ -13,6 +13,10 @@ type internal MapZSetOp<'A, 'B when 'A : comparison and 'B : comparison>(input: let inputs = [| input :> Op |] override _.Name = "map" override _.Inputs = inputs + /// Linear: ZSet.map f distributes over Z-set addition because + /// `f` rewrites keys without touching weights, and equal keys sum + /// their weights linearly. + override _.IsLinear = true override this.StepAsync(_: CancellationToken) = this.Value <- ZSet.map f.Invoke input.Value ValueTask.CompletedTask @@ -24,6 +28,8 @@ type internal FilterZSetOp<'K when 'K : comparison>(input: Op>, predica let inputs = [| input :> Op |] override _.Name = "filter" override _.Inputs = inputs + /// Linear: predicate is weight-independent, so filter(a+b) = filter(a)+filter(b). + override _.IsLinear = true override this.StepAsync(_: CancellationToken) = this.Value <- ZSet.filter predicate.Invoke input.Value ValueTask.CompletedTask @@ -36,6 +42,9 @@ type internal FlatMapZSetOp<'A, 'B when 'A : comparison and 'B : comparison> let inputs = [| input :> Op |] override _.Name = "flatMap" override _.Inputs = inputs + /// Linear: ZSet.flatMap scales `f k` by the input entry's weight + /// before accumulating — distributes over Z-set addition. + override _.IsLinear = true override this.StepAsync(_: CancellationToken) = this.Value <- ZSet.flatMap f.Invoke input.Value ValueTask.CompletedTask @@ -69,6 +78,8 @@ type internal NegZSetOp<'K when 'K : comparison>(a: Op>) = let inputs = [| a :> Op |] override _.Name = "neg" override _.Inputs = inputs + /// Linear: -(a + b) = -a + -b and -0 = 0. + override _.IsLinear = true override this.StepAsync(_: CancellationToken) = this.Value <- ZSet.neg a.Value ValueTask.CompletedTask @@ -108,6 +119,10 @@ type internal JoinZSetOp<'A, 'B, 'K, 'C let inputs = [| a :> Op; b :> Op |] override _.Name = "join" override _.Inputs = inputs + /// Bilinear: (a₁+a₂) ⋈ b = (a₁ ⋈ b) + (a₂ ⋈ b), symmetric in b, + /// and 0 ⋈ b = a ⋈ 0 = 0. IncrementalAuto rewrites this to the + /// three-term form `Δa ⋈ Δb + z⁻¹(I(a)) ⋈ Δb + Δa ⋈ z⁻¹(I(b))`. + override _.IsBilinear = true override this.StepAsync(_: CancellationToken) = this.Value <- ZSet.join @@ -125,6 +140,9 @@ type internal CartesianZSetOp<'A, 'B when 'A : comparison and 'B : comparison> let inputs = [| a :> Op; b :> Op |] override _.Name = "cartesian" override _.Inputs = inputs + /// Bilinear: weights multiply (Checked.* in ZSet.cartesian); the + /// product distributes over Z-set addition in each argument. + override _.IsBilinear = true override this.StepAsync(_: CancellationToken) = this.Value <- ZSet.cartesian a.Value b.Value ValueTask.CompletedTask @@ -179,6 +197,10 @@ type internal IndexWithOp<'A, 'K, 'V let inputs = [| input :> Op |] override _.Name = "indexWith" override _.Inputs = inputs + /// Linear: indexing distributes over Z-set addition because the + /// (key, value) extraction is weight-independent and the + /// per-key value groups sum via ZSet.add. + override _.IsLinear = true override this.StepAsync(_: CancellationToken) = this.Value <- IndexedZSet.indexWith key.Invoke value.Invoke input.Value ValueTask.CompletedTask @@ -194,6 +216,9 @@ type internal IndexedJoinOp<'K, 'VA, 'VB, 'C let inputs = [| a :> Op; b :> Op |] override _.Name = "indexedJoin" override _.Inputs = inputs + /// Bilinear: per-key value-group cartesian; weights multiply + /// (Checked.* in IndexedZSet.join); distributes per-arg. + override _.IsBilinear = true override this.StepAsync(_: CancellationToken) = this.Value <- IndexedZSet.join diff --git a/src/Core/PluginApi.fs b/src/Core/PluginApi.fs index e36eed8da1..cde3892845 100644 --- a/src/Core/PluginApi.fs +++ b/src/Core/PluginApi.fs @@ -95,21 +95,54 @@ type INestedFixpointParticipant = abstract Fixedpoint : scope: int -> bool +// ───────────────────────────────────────────────────────────────────── +// Algebra capability tags — non-generic markers + typed interfaces. +// +// F# (and the CLR) cannot test `:? IBilinearOperator` +// against a concrete `IBilinearOperator<'A, 'B, 'C>` instance because +// generic-interface type tests require exact type-parameter match. +// The fix is the BCL pattern used by `IEnumerable` vs `IEnumerable`: +// a non-generic marker interface for runtime type tests, and a +// generic interface (inheriting the marker) for typed access. +// +// Plugin authors only need to implement the typed interface; the +// marker is satisfied automatically via interface inheritance. The +// scheduler / fusion engine / IncrementalAuto dispatcher tests +// against the marker, not the generic. +// ───────────────────────────────────────────────────────────────────── + +/// Non-generic marker for `ILinearOperator<_, _>`. Used by +/// `PluginOperatorAdapter` and the scheduler for `:?` runtime tests +/// that don't need exact generic-parameter match. +type ILinearMarker = interface end + +/// Non-generic marker for `IBilinearOperator<_, _, _>`. +type IBilinearMarker = interface end + +/// Non-generic marker for `ISinkOperator<_, _>`. +type ISinkMarker = interface end + +/// Non-generic marker for `IStatefulStrictOperator<_, _, _>`. +type IStatefulStrictMarker = interface end + + /// Algebra capability: the operator is *linear* — `op(a + b) = /// op(a) + op(b)` and `op(0) = 0`. Retraction-native: a /// negative weight un-accumulates correctly. Declared at the /// type level so the scheduler can run `LinearLaw` at -/// `Circuit.Build()`. +/// `Circuit.Build()` (test-time, via `LawRunner.checkLinear`). type ILinearOperator<'TIn, 'TOut> = inherit IOperator<'TOut> + inherit ILinearMarker /// Algebra capability: the operator is *bilinear* in two /// inputs (e.g. a join). Incrementalisation generates the /// standard `Δa ⋈ Δb + z^-1(I(a)) ⋈ Δb + Δa ⋈ z^-1(I(b))` -/// form. +/// form. Verified by `LawRunner.checkBilinear` (when available). type IBilinearOperator<'TIn1, 'TIn2, 'TOut> = inherit IOperator<'TOut> + inherit IBilinearMarker /// Algebra capability: the operator is a *sink* — terminal, @@ -117,19 +150,22 @@ type IBilinearOperator<'TIn1, 'TIn2, 'TOut> = /// operators are consciously exempt from relational /// composition laws and the scheduler enforces terminal /// placement (a sink may not feed another operator inside a -/// relational path). Bayesian aggregates are the canonical -/// example. +/// relational path) via the `Circuit.Build()` validation pass. +/// Bayesian aggregates are the canonical example. type ISinkOperator<'TIn, 'TOut> = inherit IOperator<'TOut> + inherit ISinkMarker /// Algebra capability: the operator carries explicit stateful /// strict semantics — init / step / retract triple. Distinct /// from `IStrictOperator` (feedback-cut): stateful-strict ops /// hold per-key or per-instance state that must retract -/// cleanly when a negative weight arrives. +/// cleanly when a negative weight arrives. Verified by +/// `LawRunner.checkRetractionCompleteness`. type IStatefulStrictOperator<'TIn, 'TState, 'TOut> = inherit IOperator<'TOut> + inherit IStatefulStrictMarker /// Internal adapter: wraps an `IOperator<'T>` inside an @@ -164,6 +200,17 @@ type internal PluginOperatorAdapter<'TOut>(plugin: IOperator<'TOut>, inputOps: O | :? INestedFixpointParticipant as f -> Some f | _ -> None + // Algebra capability detection via non-generic markers. The typed + // interfaces (`ILinearOperator<_,_>` etc.) inherit from these + // markers, so a plugin implementing `ILinearOperator` + // satisfies `ILinearMarker` automatically. Cached once at + // construction; the adapter pays zero per-tick cost for capability + // surfacing. + let isLinearCap = (box plugin) :? ILinearMarker + let isBilinearCap = (box plugin) :? IBilinearMarker + let isSinkCap = (box plugin) :? ISinkMarker + let isStatefulStrictCap = (box plugin) :? IStatefulStrictMarker + member internal _.PublishCount = publishCount override _.Name = plugin.Name @@ -177,6 +224,11 @@ type internal PluginOperatorAdapter<'TOut>(plugin: IOperator<'TOut>, inputOps: O | Some a -> a.IsAsync | None -> false + override _.IsLinear = isLinearCap + override _.IsBilinear = isBilinearCap + override _.IsSink = isSinkCap + override _.IsStatefulStrict = isStatefulStrictCap + override this.StepAsync(ct: CancellationToken) : ValueTask = let buffer = OutputBuffer<'TOut>(this, publishCount) plugin.StepAsync(buffer, ct) diff --git a/src/Core/Primitive.fs b/src/Core/Primitive.fs index d64ac723b0..17f3a9348f 100644 --- a/src/Core/Primitive.fs +++ b/src/Core/Primitive.fs @@ -14,6 +14,11 @@ type internal DelayOp<'T>(input: Op<'T>, initial: 'T) = override _.Name = "z^-1" override _.Inputs = inputs override _.IsStrict = true + /// Linear: `z⁻¹` is a time-shift; it distributes over addition + /// trivially when `initial = 0` for the group. Callers passing a + /// non-zero initial are responsible for the resulting affine + /// offset — DBSP usage always passes the group zero. + override _.IsLinear = true override this.StepAsync(_: CancellationToken) = this.Value <- state ValueTask.CompletedTask @@ -35,6 +40,11 @@ type internal IntegrateOp<'T>(input: Op<'T>, zero: 'T, add: Func<'T, 'T, 'T>) = let mutable state = zero override _.Name = "integrate" override _.Inputs = inputs + /// Linear: `I` is the running sum operator; it commutes with the + /// group operation by associativity/commutativity of the supplied + /// `add` function. Caller is responsible for passing a true + /// group `(zero, add)` — Z-set's `(Empty, ZSet.add)` qualifies. + override _.IsLinear = true // Integration is causal but NOT strict (per the DBSP paper, §2.18). The // scheduler runs us in topological order *after* `input`, so at StepAsync // time `input.Value` is the current tick's delta. We update state and @@ -52,6 +62,10 @@ type internal DifferentiateOp<'T>(input: Op<'T>, zero: 'T, sub: Func<'T, 'T, 'T> let mutable prev = zero override _.Name = "differentiate" override _.Inputs = inputs + /// Linear: `D` distributes over the group operation by linearity + /// of `sub` on an abelian group. Inverse of `IntegrateOp` (modulo + /// initial conditions). + override _.IsLinear = true // IsStrict remains false — non-strict ops run in topo order so by the time // we execute, the input operator's `Value` reflects the current tick. override this.StepAsync(_: CancellationToken) = diff --git a/tests/Tests.FSharp/Plugin/Capabilities.Tests.fs b/tests/Tests.FSharp/Plugin/Capabilities.Tests.fs new file mode 100644 index 0000000000..028c7cc312 --- /dev/null +++ b/tests/Tests.FSharp/Plugin/Capabilities.Tests.fs @@ -0,0 +1,389 @@ +module Zeta.Tests.Plugin.CapabilitiesTests + +/// Tests for the algebra-capability tags lifted from plugin-only +/// marker interfaces (`ILinearOperator` etc. in `PluginApi.fs`) onto +/// the `Op` base class in `Circuit.fs`. Three coverage axes: +/// +/// 1. Internal operators (registered via `Circuit.Map` / +/// `Circuit.Join` / ...) declare the correct capability via +/// direct override on the concrete `Op<'T>` subclass. +/// 2. Plugin operators (registered via the `IOperator<'T>` +/// extension) have their capability detected through the +/// non-generic markers (`ILinearMarker`, `IBilinearMarker`, +/// `ISinkMarker`, `IStatefulStrictMarker`) — the runtime +/// `:?` test works because the typed interfaces inherit from +/// the markers. +/// 3. Defaults are false — operators that don't override and don't +/// implement any algebra marker report all four capabilities as +/// false (the conservative null hypothesis). + +open System +open System.Threading +open System.Threading.Tasks +open Xunit +open FsUnit.Xunit +open Zeta.Core + + +// ───────────────────────────────────────────────────────────────── +// Helpers +// ───────────────────────────────────────────────────────────────── + +/// Find the first operator with a given `Name` in the circuit's +/// registered op set. Internal-op names are stable strings ("map", +/// "filter", "join", etc.) declared on each subclass. +let private opByName (circuit: Circuit) (name: string) : Op = + circuit.Ops + |> Seq.find (fun op -> op.Name = name) + + +// ───────────────────────────────────────────────────────────────── +// Internal-operator capability tests +// +// Each test wires a minimal one-or-two-op circuit, then asserts the +// capability flags on the named operator. Build the circuit but +// do NOT step — we only inspect the registered metadata. +// ───────────────────────────────────────────────────────────────── + +[] +let ``MapZSetOp declares IsLinear`` () = + let c = Circuit() + let input = c.ZSetInput() + let _ = c.Map(input.Stream, Func(fun x -> x * 2)) + c.Build() + let op = opByName c "map" + op.IsLinear |> should equal true + op.IsBilinear |> should equal false + op.IsSink |> should equal false + op.IsStatefulStrict |> should equal false + + +[] +let ``FilterZSetOp declares IsLinear`` () = + let c = Circuit() + let input = c.ZSetInput() + let _ = c.Filter(input.Stream, Func(fun x -> x > 0)) + c.Build() + (opByName c "filter").IsLinear |> should equal true + + +[] +let ``FlatMapZSetOp declares IsLinear`` () = + let c = Circuit() + let input = c.ZSetInput() + let _ = c.FlatMap(input.Stream, Func>(fun x -> ZSet.singleton x 1L)) + c.Build() + (opByName c "flatMap").IsLinear |> should equal true + + +[] +let ``NegZSetOp declares IsLinear`` () = + let c = Circuit() + let input = c.ZSetInput() + let _ = c.Negate input.Stream + c.Build() + (opByName c "neg").IsLinear |> should equal true + + +[] +let ``PlusZSetOp declares no algebra capability (additive, not unary-linear)`` () = + let c = Circuit() + let a = c.ZSetInput() + let b = c.ZSetInput() + let _ = c.Plus(a.Stream, b.Stream) + c.Build() + let op = opByName c "plus" + // Plus is a 2-input additive op: Plus(0, b) = b ≠ 0, so it's + // neither unary-linear nor bilinear under the strict definitions. + // A future capability `IsAdditive` could capture the per-input + // distribution property; for now Plus reports all caps false. + op.IsLinear |> should equal false + op.IsBilinear |> should equal false + + +[] +let ``MinusZSetOp declares no algebra capability (same reason as Plus)`` () = + let c = Circuit() + let a = c.ZSetInput() + let b = c.ZSetInput() + let _ = c.Minus(a.Stream, b.Stream) + c.Build() + let op = opByName c "minus" + op.IsLinear |> should equal false + op.IsBilinear |> should equal false + + +[] +let ``DistinctZSetOp does NOT declare IsLinear (clamps weights)`` () = + let c = Circuit() + let input = c.ZSetInput() + let _ = c.Distinct input.Stream + c.Build() + let op = opByName c "distinct" + // distinct(a + b) ≠ distinct(a) + distinct(b) when both share + // a positive-weighted key — distinct clamps to {0, 1}. + op.IsLinear |> should equal false + op.IsBilinear |> should equal false + + +[] +let ``JoinZSetOp declares IsBilinear`` () = + let c = Circuit() + let a = c.ZSetInput() + let b = c.ZSetInput() + let _ = + c.Join( + a.Stream, b.Stream, + Func(fst), + Func(fst), + Func(fun (_, s) (_, n) -> $"%s{s}-%d{n}")) + c.Build() + let op = opByName c "join" + op.IsLinear |> should equal false + op.IsBilinear |> should equal true + + +[] +let ``CartesianZSetOp declares IsBilinear`` () = + let c = Circuit() + let a = c.ZSetInput() + let b = c.ZSetInput() + let _ = c.Cartesian(a.Stream, b.Stream) + c.Build() + (opByName c "cartesian").IsBilinear |> should equal true + + +[] +let ``IndexWithOp declares IsLinear`` () = + let c = Circuit() + let input = c.ZSetInput() + let _ = + c.IndexWith( + input.Stream, + Func(fst), + Func(snd)) + c.Build() + (opByName c "indexWith").IsLinear |> should equal true + + +[] +let ``IndexedJoinOp declares IsBilinear`` () = + let c = Circuit() + let a = c.ZSetInput() + let b = c.ZSetInput() + let ia = + c.IndexWith( + a.Stream, Func(fst), Func(snd)) + let ib = + c.IndexWith( + b.Stream, Func(fst), Func(snd)) + let _ = + c.IndexedJoin( + ia, ib, + Func(fun _ s n -> $"%s{s}-%d{n}")) + c.Build() + (opByName c "indexedJoin").IsBilinear |> should equal true + + +[] +let ``DelayOp declares IsLinear`` () = + let c = Circuit() + let input = c.ZSetInput() + let _ = c.DelayZSet input.Stream + c.Build() + let op = opByName c "z^-1" + op.IsLinear |> should equal true + // z⁻¹ is also strict — preserved unchanged from before. + op.IsStrict |> should equal true + + +[] +let ``IntegrateOp declares IsLinear`` () = + let c = Circuit() + let input = c.ZSetInput() + let _ = c.IntegrateZSet input.Stream + c.Build() + (opByName c "integrate").IsLinear |> should equal true + + +[] +let ``DifferentiateOp declares IsLinear`` () = + let c = Circuit() + let input = c.ZSetInput() + let _ = c.DifferentiateZSet input.Stream + c.Build() + (opByName c "differentiate").IsLinear |> should equal true + + +[] +let ``ConstantOp does NOT declare IsLinear (affine, not linear)`` () = + let c = Circuit() + let _ = c.Constant 42 + c.Build() + // const_c(0) = c ≠ 0 (unless c = 0), so Constant is affine + // in general — not linear. We default to false. + (opByName c "const").IsLinear |> should equal false + + +[] +let ``FilterMapOp (fused) declares IsLinear`` () = + let c = Circuit() + let input = c.ZSetInput() + let _ = + c.FilterMap( + input.Stream, + Func(fun x -> x > 0), + Func(fun x -> x * 2)) + c.Build() + (opByName c "filterMap").IsLinear |> should equal true + + +// ───────────────────────────────────────────────────────────────── +// Plugin-marker detection tests +// +// These exercise the non-generic-marker pattern: a plugin that +// implements `ILinearOperator<'A, 'B>` automatically satisfies +// `ILinearMarker` via interface inheritance, and the +// `PluginOperatorAdapter` constructor caches a `:? ILinearMarker` +// test result that's then surfaced via `Op.IsLinear`. +// +// Critical: we test the GENERIC interface implementation, not the +// marker directly. If the inheritance chain is broken, the runtime +// `:?` test returns false even though the typed interface is +// declared — that's the bug class this layer is preventing. +// ───────────────────────────────────────────────────────────────── + +/// Plugin that implements `ILinearOperator` — the adapter +/// should detect `ILinearMarker` and surface `IsLinear = true`. +type private LinearPluginOp(input: Stream) = + let deps = [| input.AsDependency() |] + interface ILinearOperator with + member _.Name = "test-linear" + member _.ReadDependencies = deps + member _.StepAsync(out, _ct) = + out.Publish (input.Current * 3) + ValueTask.CompletedTask + + +/// Plugin that implements `IBilinearOperator` — +/// adapter should detect `IBilinearMarker`. Uses a single input for +/// test simplicity; bilinearity is declarative here, not exercised. +type private BilinearPluginOp(input: Stream) = + let deps = [| input.AsDependency() |] + interface IBilinearOperator with + member _.Name = "test-bilinear" + member _.ReadDependencies = deps + member _.StepAsync(out, _ct) = + out.Publish input.Current + ValueTask.CompletedTask + + +/// Plugin that implements `ISinkOperator` — adapter +/// should detect `ISinkMarker`. +type private SinkPluginOp(input: Stream) = + let deps = [| input.AsDependency() |] + interface ISinkOperator with + member _.Name = "test-sink" + member _.ReadDependencies = deps + member _.StepAsync(out, _ct) = + out.Publish input.Current + ValueTask.CompletedTask + + +/// Plugin that implements `IStatefulStrictOperator` — +/// adapter should detect `IStatefulStrictMarker`. +type private StatefulStrictPluginOp(input: Stream) = + let deps = [| input.AsDependency() |] + interface IStatefulStrictOperator with + member _.Name = "test-stateful-strict" + member _.ReadDependencies = deps + member _.StepAsync(out, _ct) = + out.Publish input.Current + ValueTask.CompletedTask + + +/// Plugin that implements ONLY `IOperator` — no algebra +/// marker. All four capabilities must report false. +type private PlainPluginOp(input: Stream) = + let deps = [| input.AsDependency() |] + interface IOperator with + member _.Name = "test-plain" + member _.ReadDependencies = deps + member _.StepAsync(out, _ct) = + out.Publish input.Current + ValueTask.CompletedTask + + +[] +let ``Plugin ILinearOperator → adapter reports IsLinear = true`` () = + let c = Circuit() + let input = c.Constant 0 + let _ = c.RegisterStream (LinearPluginOp input :> IOperator) + c.Build() + let op = opByName c "test-linear" + op.IsLinear |> should equal true + op.IsBilinear |> should equal false + op.IsSink |> should equal false + op.IsStatefulStrict |> should equal false + + +[] +let ``Plugin IBilinearOperator → adapter reports IsBilinear = true`` () = + let c = Circuit() + let input = c.Constant 0 + let _ = c.RegisterStream (BilinearPluginOp input :> IOperator) + c.Build() + let op = opByName c "test-bilinear" + op.IsLinear |> should equal false + op.IsBilinear |> should equal true + op.IsSink |> should equal false + op.IsStatefulStrict |> should equal false + + +[] +let ``Plugin ISinkOperator → adapter reports IsSink = true`` () = + let c = Circuit() + let input = c.Constant 0 + let _ = c.RegisterStream (SinkPluginOp input :> IOperator) + c.Build() + let op = opByName c "test-sink" + op.IsLinear |> should equal false + op.IsBilinear |> should equal false + op.IsSink |> should equal true + op.IsStatefulStrict |> should equal false + + +[] +let ``Plugin IStatefulStrictOperator → adapter reports IsStatefulStrict = true`` () = + let c = Circuit() + let input = c.Constant 0 + let _ = c.RegisterStream (StatefulStrictPluginOp input :> IOperator) + c.Build() + let op = opByName c "test-stateful-strict" + op.IsStatefulStrict |> should equal true + + +[] +let ``Plain IOperator plugin → adapter reports all algebra caps false`` () = + let c = Circuit() + let input = c.Constant 0 + let _ = c.RegisterStream (PlainPluginOp input :> IOperator) + c.Build() + let op = opByName c "test-plain" + op.IsLinear |> should equal false + op.IsBilinear |> should equal false + op.IsSink |> should equal false + op.IsStatefulStrict |> should equal false + + +// ───────────────────────────────────────────────────────────────── +// BayesianRateOp end-to-end — the canonical real-world sink +// +// This test verifies that the ISinkMarker inheritance correctly +// surfaces through Zeta.Bayesian's `BayesianRateOp`. We can't +// exercise it directly here (Zeta.Bayesian isn't a test-project +// reference and shouldn't be — that'd be a circular-shape +// test), so we rely on the SinkPluginOp above as the structural +// proof. Adding Zeta.Bayesian to this project's references is +// out of scope for PR 1. +// ───────────────────────────────────────────────────────────────── diff --git a/tests/Tests.FSharp/Tests.FSharp.fsproj b/tests/Tests.FSharp/Tests.FSharp.fsproj index f9c9cb4a88..f6f8df46ed 100644 --- a/tests/Tests.FSharp/Tests.FSharp.fsproj +++ b/tests/Tests.FSharp/Tests.FSharp.fsproj @@ -106,6 +106,7 @@ + From 613b7c93b1c879cc3e710837b2b67c7213cb7625 Mon Sep 17 00:00:00 2001 From: Aaron Stainback Date: Thu, 21 May 2026 13:24:14 -0400 Subject: [PATCH 2/2] =?UTF-8?q?feat(core):=20enforce=20sink-terminality=20?= =?UTF-8?q?at=20Circuit.Build()=20=E2=80=94=20PR=202=20of=208?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Makes load-bearing the docstring promise on `ISinkOperator` (PluginApi.fs): *"the scheduler enforces terminal placement (a sink may not feed another operator inside a relational path)"* — until this PR, that promise was vapor. ## Mechanism After the topological sort succeeds in `Circuit.Build()`, scan every operator's `Inputs` array. If any input has `IsSink = true`, throw an `InvalidOperationException` naming both endpoints, their IDs, and a pointer to the algebra-tag contract. O(N + E) per build; runs once per circuit lifetime; zero per-tick cost. The check runs AFTER topological sort so: - Operator IDs in the error message are stable (post-Build). - Cycle detection (the more structurally-fatal problem) fires first when both are present. ## Why this matters Sinks are retraction-lossy by design — `BayesianRateOp` aggregates state in a `BetaBernoulli` instance that doesn't un-accumulate when a `-1` weight arrives. Letting a downstream operator read from a sink would break the Z-set relational composition laws (associativity over add, commutativity, distribution through joins). The compile-time type-checker can catch some cases (when the sink's output type doesn't match downstream's input type), but generic-typed sinks like `ISinkOperator, ZSet>` would slip through without this runtime check. ## Tests 9 new tests in `tests/Tests.FSharp/Circuit/SinkTerminality.Tests.fs`: Positive (terminal sinks build normally): - Sink at terminus, single op upstream - Sink consuming a Map output (sink itself at terminus) - Multiple independent sinks - Non-sink plugin feeding Map (rejection is sink-specific) Negative (operators reading from sinks are rejected): - Map reading from sink output - Filter reading from sink output - Plus reading from sink output (multi-input op case) Error-message contract: - Names both endpoints and their IDs - Cites PluginApi.fs:ISinkOperator - Explains the algebraic reason ("retraction-lossy") Ordering: - Cycle detection fires before sink-terminality All 9 pass. No regressions: 217/218 Circuit/Operators/Plugin tests pass (1 pre-existing SKIP unchanged). Build clean. ## Dependency PR 2 depends on PR 1 (#4558) for the `IsSink` property on `Op<'T>`. Branch is stacked on `feat/op-capability-tags-2026-05-21`; targets `main` and should auto-rebase cleanly once PR 1 merges. ## Foundation for later PRs PR 5's FusionEngine must NOT fuse across a sink boundary — the terminality check at Build time guarantees no sink ever appears mid-pipeline, simplifying the engine's fusion-pattern rules. --- src/Core/Circuit.fs | 27 +++ .../Circuit/SinkTerminality.Tests.fs | 185 ++++++++++++++++++ tests/Tests.FSharp/Tests.FSharp.fsproj | 1 + 3 files changed, 213 insertions(+) create mode 100644 tests/Tests.FSharp/Circuit/SinkTerminality.Tests.fs diff --git a/src/Core/Circuit.fs b/src/Core/Circuit.fs index d72fda5296..ee04cc1209 100644 --- a/src/Core/Circuit.fs +++ b/src/Core/Circuit.fs @@ -230,6 +230,33 @@ type Circuit() = if inDeg.[c] = 0 then q.Enqueue c if k <> n then invalidOp "Circuit has a cycle that does not pass through a strict operator" + // Sink-terminality enforcement. The `ISinkOperator` docstring + // (PluginApi.fs) promises *"the scheduler enforces terminal + // placement (a sink may not feed another operator inside a + // relational path)"*. PR 2 makes that promise load-bearing: + // any operator whose `Inputs` contains a sink is rejected + // with a diagnostic naming both endpoints, the sink's + // position in the DAG, and a pointer to the algebra-tag + // contract. Sinks are retraction-lossy by design (e.g. + // BayesianRateOp aggregates state that doesn't un-accumulate); + // letting a downstream operator read from a sink would + // violate the relational composition laws Z-set algebra + // depends on. + // + // Defensive ordering: this runs AFTER the topo-sort succeeds + // so the error message can reference op IDs that are stable. + // O(N + E) — each edge checked exactly once. + for op in ops do + for dep in op.Inputs do + if dep.IsSink then + invalidOp + (sprintf + "Sink-terminality violation: operator '%s' (id=%d) reads from \ + sink operator '%s' (id=%d). Sinks are terminal — they may not \ + feed other operators in a relational path because sink state is \ + retraction-lossy and breaks Z-set composition laws. See \ + PluginApi.fs:ISinkOperator for the contract." + op.Name op.Id dep.Name dep.Id) schedule <- order let sn = ResizeArray() for op in ops do if op.IsStrict then sn.Add op diff --git a/tests/Tests.FSharp/Circuit/SinkTerminality.Tests.fs b/tests/Tests.FSharp/Circuit/SinkTerminality.Tests.fs new file mode 100644 index 0000000000..beee259f5c --- /dev/null +++ b/tests/Tests.FSharp/Circuit/SinkTerminality.Tests.fs @@ -0,0 +1,185 @@ +module Zeta.Tests.Circuit.SinkTerminalityTests + +/// Tests for the sink-terminality enforcement added to `Circuit.Build()` +/// in PR 2. The `ISinkOperator` docstring promises *"the scheduler +/// enforces terminal placement (a sink may not feed another operator +/// inside a relational path)"* — until PR 2, that promise was vapor. +/// These tests make it load-bearing: +/// +/// 1. A circuit with a sink at the terminus builds normally. +/// 2. A circuit with an operator reading from a sink's output stream +/// is rejected at `Build()` time with a diagnostic naming both +/// endpoints, the sink's ID, and a pointer to the contract. +/// 3. The check is `O(N + E)` and runs once per Build — no per-tick +/// cost — and runs AFTER the topological sort succeeds so IDs +/// are stable. +/// +/// Composes with PR 1's `IsSink` capability tag on `Op<'T>` (which is +/// what the Build pass consults; without that property, this check +/// couldn't exist). + +open System +open System.Threading +open System.Threading.Tasks +open Xunit +open FsUnit.Xunit +open Zeta.Core + + +// ───────────────────────────────────────────────────────────────── +// Test sinks +// +// We make sinks that *output* a typed `ZSet` (rather than a +// bare scalar) so we can wire a `Map` / `Filter` downstream and +// exercise the rejection path. The sink-ness comes from the +// `ISinkOperator` interface declaration, not from the output type; +// in a real BayesianRateOp the sink-ness comes from non-Z-set output +// and retraction-lossy state. The test cares about the structural +// declaration, not the output shape. +// ───────────────────────────────────────────────────────────────── + +/// A sink that emits a Z-set output — declared `ISinkOperator`, so +/// `Circuit.Build()` should reject any operator reading from this +/// output stream. +type private ZSetSinkOp(input: Stream>) = + let deps = [| input.AsDependency() |] + interface ISinkOperator, ZSet> with + member _.Name = "test-zset-sink" + member _.ReadDependencies = deps + member _.StepAsync(out, _ct) = + out.Publish input.Current + ValueTask.CompletedTask + + +/// A plain (non-sink) plugin that emits a Z-set output — used to +/// verify the rejection is specific to sinks, not all plugins. +type private ZSetPassthroughOp(input: Stream>) = + let deps = [| input.AsDependency() |] + interface IOperator> with + member _.Name = "test-zset-passthrough" + member _.ReadDependencies = deps + member _.StepAsync(out, _ct) = + out.Publish input.Current + ValueTask.CompletedTask + + +// ───────────────────────────────────────────────────────────────── +// Positive cases — terminal sinks build normally +// ───────────────────────────────────────────────────────────────── + +[] +let ``Circuit with a sink at the terminus builds normally`` () = + let c = Circuit() + let input = c.ZSetInput() + // Sink is the only consumer of input.Stream — terminal position. + let _ = c.RegisterStream (ZSetSinkOp input.Stream :> IOperator>) + c.Build() + c.IsBuilt |> should equal true + c.OperatorCount |> should equal 2 // input + sink + + +[] +let ``Circuit with sink reading from Map (sink at terminus) builds normally`` () = + let c = Circuit() + let input = c.ZSetInput() + let mapped = c.Map(input.Stream, Func(fun x -> x * 2)) + let _ = c.RegisterStream (ZSetSinkOp mapped :> IOperator>) + c.Build() + c.IsBuilt |> should equal true + // input + map + sink + (registration plumbing) + c.OperatorCount |> should be (greaterThanOrEqualTo 3) + + +[] +let ``Circuit with multiple sinks at independent termini builds normally`` () = + let c = Circuit() + let input = c.ZSetInput() + let _ = c.RegisterStream (ZSetSinkOp input.Stream :> IOperator>) + let _ = c.RegisterStream (ZSetSinkOp input.Stream :> IOperator>) + c.Build() + c.IsBuilt |> should equal true + + +[] +let ``Circuit with a non-sink plugin feeding a Map builds normally (rejection is sink-specific)`` () = + let c = Circuit() + let input = c.ZSetInput() + let passthrough = c.RegisterStream (ZSetPassthroughOp input.Stream :> IOperator>) + let _ = c.Map(passthrough, Func(fun x -> x + 1)) + c.Build() + c.IsBuilt |> should equal true + + +// ───────────────────────────────────────────────────────────────── +// Negative cases — operators reading from sinks are rejected +// ───────────────────────────────────────────────────────────────── + +[] +let ``Circuit.Build rejects Map reading from sink output stream`` () = + let c = Circuit() + let input = c.ZSetInput() + let sinkStream = + c.RegisterStream (ZSetSinkOp input.Stream :> IOperator>) + // Sin: wire a Map to consume the sink's output. Should fail at Build(). + let _ = c.Map(sinkStream, Func(fun x -> x * 2)) + (fun () -> c.Build()) |> shouldFail + + +[] +let ``Circuit.Build rejects Filter reading from sink output stream`` () = + let c = Circuit() + let input = c.ZSetInput() + let sinkStream = + c.RegisterStream (ZSetSinkOp input.Stream :> IOperator>) + let _ = c.Filter(sinkStream, Func(fun x -> x > 0)) + (fun () -> c.Build()) |> shouldFail + + +[] +let ``Circuit.Build rejects Plus reading from sink output stream`` () = + let c = Circuit() + let a = c.ZSetInput() + let b = c.ZSetInput() + let sinkStream = + c.RegisterStream (ZSetSinkOp a.Stream :> IOperator>) + let _ = c.Plus(sinkStream, b.Stream) + (fun () -> c.Build()) |> shouldFail + + +[] +let ``Sink-rejection error message names both operators and IDs`` () = + let c = Circuit() + let input = c.ZSetInput() + let sinkStream = + c.RegisterStream (ZSetSinkOp input.Stream :> IOperator>) + let _ = c.Map(sinkStream, Func(fun x -> x * 2)) + let ex = Assert.Throws(fun () -> c.Build()) + // Naming both endpoints helps the user locate the violation. + ex.Message |> should haveSubstring "map" + ex.Message |> should haveSubstring "test-zset-sink" + // Contract pointer so a confused reader knows where to look. + ex.Message |> should haveSubstring "PluginApi.fs" + // Honesty about WHY: the message should explain the algebraic + // reason, not just say "rejected". + ex.Message |> should haveSubstring "retraction-lossy" + + +[] +let ``Cycle detection runs before sink-terminality (so error messages stay focused)`` () = + // The Build pass orders: topo-sort first (would fail on cycle), + // sink-terminality second. A circuit with BOTH a cycle and a + // sink-rejection should report the cycle, not the sink, because + // cycle is the more structurally-fatal problem. This test asserts + // the ordering by constructing a sink-feeding-Map (which would + // fail sink-terminality) — and confirming that adding it does NOT + // induce a cycle (i.e. the test setup is correct). The sink + // rejection should fire because no cycle exists. + let c = Circuit() + let input = c.ZSetInput() + let sinkStream = + c.RegisterStream (ZSetSinkOp input.Stream :> IOperator>) + let _ = c.Map(sinkStream, Func(fun x -> x * 2)) + let ex = Assert.Throws(fun () -> c.Build()) + // Confirms sink-terminality fired, not cycle detection. + ex.Message |> should haveSubstring "Sink-terminality violation" + ex.Message |> should not' (haveSubstring "cycle") diff --git a/tests/Tests.FSharp/Tests.FSharp.fsproj b/tests/Tests.FSharp/Tests.FSharp.fsproj index f6f8df46ed..1a8fc8ad69 100644 --- a/tests/Tests.FSharp/Tests.FSharp.fsproj +++ b/tests/Tests.FSharp/Tests.FSharp.fsproj @@ -36,6 +36,7 @@ +