Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions src/Core/Circuit.fs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,52 @@ type Op() =
abstract IsAsync: bool
default _.IsAsync = false

// ─────────────────────────────────────────────────────────────────
// Algebra capability tags. Promoted from plugin-only marker
// interfaces (PluginApi.fs) to first-class fields on the Op base
// class so internal operators and plugin operators declare
// capabilities through the same surface. The scheduler, fusion
// engine, and incremental-rewriter dispatcher all consult these
// fields — they're load-bearing for capability-aware optimization,
// not decorative.
//
// Default false; each concrete operator overrides only the
// capabilities it actually has. Lying about a tag is an algebraic
// contract violation — LawRunner can verify each in test mode.
// ─────────────────────────────────────────────────────────────────

/// Algebra capability: operator is *linear* — `op(a + b) = op(a) +
/// op(b)` and `op(0) = 0`. Retraction-native: a negative-weight
/// input un-accumulates correctly. `IncrementalAuto` uses this to
/// emit `Q^Δ = Q` (linear operators incrementalize trivially).
abstract IsLinear: bool
default _.IsLinear = false

/// Algebra capability: operator is *bilinear* in its two inputs.
/// `op(a₁+a₂, b) = op(a₁, b) + op(a₂, b)` and symmetrically for the
/// second argument; additionally `op(0, b) = op(a, 0) = 0`.
/// `IncrementalAuto` uses this to emit the three-term incremental
/// form `Δa ⋈ Δb + z⁻¹(I(a)) ⋈ Δb + Δa ⋈ z⁻¹(I(b))`.
Comment on lines +69 to +70
abstract IsBilinear: bool
default _.IsBilinear = false

/// Algebra capability: operator is a *sink* — terminal,
/// retraction-lossy, may emit a non-Z-set output. Sinks are
/// excluded from relational composition: `Circuit.Build()` rejects
/// any operator that reads from a sink's output stream (terminal-
/// placement enforcement). Bayesian aggregates and external-system
/// sinks are canonical examples.
abstract IsSink: bool
default _.IsSink = false

/// Algebra capability: operator carries explicit stateful-strict
/// semantics — init/step/retract triple — distinct from `IsStrict`
/// (feedback-cut). Stateful-strict operators hold per-key state
/// that must retract cleanly when a negative-weight input arrives.
/// `LawRunner.checkRetractionCompleteness` verifies the claim.
abstract IsStatefulStrict: bool
default _.IsStatefulStrict = false


/// An operator with a typed output slot.
///
Expand Down
7 changes: 7 additions & 0 deletions src/Core/Fusion.fs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ type internal FilterMapOp<'A, 'B when 'A : comparison and 'B : comparison>
let inputs = [| input :> Op |]
override _.Name = "filterMap"
override _.Inputs = inputs
/// Linear: filter ∘ map is the composition of two linear ops —
/// distributes over Z-set addition. The fused implementation
/// preserves linearity by construction.
override _.IsLinear = true
override this.StepAsync(_: CancellationToken) =
let span = input.Value.AsSpan()
if span.IsEmpty then
Expand Down Expand Up @@ -50,6 +54,9 @@ type internal FilterMapOptionalOp<'A, 'B when 'A : comparison and 'B : compariso
let inputs = [| input :> Op |]
override _.Name = "filterMap"
override _.Inputs = inputs
/// Linear: same reasoning as FilterMapOp — the choose-shaped
/// `pickMap` is equivalent to a filter+map composition.
override _.IsLinear = true
override this.StepAsync(_: CancellationToken) =
let span = input.Value.AsSpan()
if span.IsEmpty then
Expand Down
71 changes: 71 additions & 0 deletions src/Core/Incremental.fs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,77 @@ type IncrementalExtensions =
let prev = this.DelayZSet integrated // z^-1(I(Δa)) = snapshot before this tick
this.DistinctIncremental(prev, delta)

/// **Capability-aware incremental dispatcher.** Picks the right
/// incrementalization for `q` based on the algebra capability tag
/// on the operator `q` produces:
///
/// - `IsLinear = true` → `Q^Δ = Q` (trivial; deltas pass through)
/// - `IsSink = true` → throws (sinks are terminal; can't be
/// incrementalized — see PR 2's
/// sink-terminality enforcement)
/// - otherwise → `D ∘ Q ∘ I` fallback (always correct
/// but expensive — materializes the full
/// integrated relation every tick)
///
/// ## How the dispatch works
///
/// `q` is a `Stream → Stream` factory. We probe it by applying to
/// the delta `input`; the resulting `Stream<'TOut>.Op` carries the
/// capability tags from PR 1. We then either return the probed
/// output directly (linear case — the probed op IS the correct
/// incremental result because linear ops commute with `D`/`I`) or
/// throw (sink case) or rebuild via the `D ∘ Q ∘ I` fallback.
///
/// ## Known cost: orphan operator in the non-linear fallback path
///
/// When the fallback fires, the probed operator stays registered
/// in the circuit with no consumers — the scheduler will still tick
/// it every cycle (wasted work), but it produces no observable
/// output. Pruning unreachable operators at `Circuit.Build()` time
/// is a future improvement; the dispatcher's correctness doesn't
/// depend on it.
///
/// ## When NOT to use this
///
/// For bilinear joins, use `IncrementalJoin` directly — it has a
/// richer signature (key functions, combine function) that this
/// unary dispatcher can't express. A `IncrementalAutoJoin`
/// dispatcher for bilinear ops can be added in a later PR.
///
/// ## Reads
///
/// `Op.IsLinear` and `Op.IsSink` (PR 1). For internal operators
/// these are overridden in the concrete subclasses; for plugin
/// operators they're surfaced via the `PluginOperatorAdapter`
/// marker detection.
[<Extension>]
static member IncrementalAuto<'K when 'K : comparison>
(this: Circuit,
q: Func<Stream<ZSet<'K>>, Stream<ZSet<'K>>>,
input: Stream<ZSet<'K>>) : Stream<ZSet<'K>> =
// Probe: apply q to input directly to inspect the resulting
// operator's capability tag. Registration is a side effect;
// see "orphan operator" note above.
let probedOutput = q.Invoke input
let resultOp = probedOutput.Op
if resultOp.IsSink then
invalidOp
(sprintf
"IncrementalAuto: cannot incrementalize a sink operator. \
'%s' (id=%d) declared IsSink=true; sinks are terminal \
and excluded from relational composition. Use a \
non-sink operator, or consume the sink's output \
directly without incrementalization."
resultOp.Name resultOp.Id)
elif resultOp.IsLinear then
// Q^Δ = Q. For linear q, q(delta) IS the delta of q(full).
Comment on lines +132 to +143
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Determine linearity from full query, not terminal operator

IncrementalAuto decides the strategy from probedOutput.Op.IsLinear, which only describes the final operator in q, not the whole q graph. This misclassifies composite queries such as q = map ∘ distinct: the terminal map is linear, so the dispatcher returns q(delta) even though distinct upstream is non-linear and requires D∘Q∘I. In that case the second duplicate delta is emitted incorrectly instead of being suppressed, so incremental results diverge from the true query delta.

Useful? React with 👍 / 👎.

// The probed output is correct as-is; return directly.
probedOutput
else
// Generic D ∘ Q ∘ I fallback. The probed op above is orphan
// in the circuit — wasteful but functionally correct.
this.IncrementalizeZSet(q, input)


/// F#-idiomatic piping. `stream |> Stream.map f |> Stream.filter p` is
/// equivalent to `circuit.Filter(circuit.Map(stream, f), p)` but reads in
Expand Down
25 changes: 25 additions & 0 deletions src/Core/Operators.fs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ type internal MapZSetOp<'A, 'B when 'A : comparison and 'B : comparison>(input:
let inputs = [| input :> Op |]
override _.Name = "map"
override _.Inputs = inputs
/// Linear: ZSet.map f distributes over Z-set addition because
/// `f` rewrites keys without touching weights, and equal keys sum
/// their weights linearly.
override _.IsLinear = true
override this.StepAsync(_: CancellationToken) =
this.Value <- ZSet.map f.Invoke input.Value
ValueTask.CompletedTask
Expand All @@ -24,6 +28,8 @@ type internal FilterZSetOp<'K when 'K : comparison>(input: Op<ZSet<'K>>, predica
let inputs = [| input :> Op |]
override _.Name = "filter"
override _.Inputs = inputs
/// Linear: predicate is weight-independent, so filter(a+b) = filter(a)+filter(b).
override _.IsLinear = true
override this.StepAsync(_: CancellationToken) =
this.Value <- ZSet.filter predicate.Invoke input.Value
ValueTask.CompletedTask
Expand All @@ -36,6 +42,9 @@ type internal FlatMapZSetOp<'A, 'B when 'A : comparison and 'B : comparison>
let inputs = [| input :> Op |]
override _.Name = "flatMap"
override _.Inputs = inputs
/// Linear: ZSet.flatMap scales `f k` by the input entry's weight
/// before accumulating — distributes over Z-set addition.
override _.IsLinear = true
override this.StepAsync(_: CancellationToken) =
this.Value <- ZSet.flatMap f.Invoke input.Value
ValueTask.CompletedTask
Expand Down Expand Up @@ -69,6 +78,8 @@ type internal NegZSetOp<'K when 'K : comparison>(a: Op<ZSet<'K>>) =
let inputs = [| a :> Op |]
override _.Name = "neg"
override _.Inputs = inputs
/// Linear: -(a + b) = -a + -b and -0 = 0.
override _.IsLinear = true
override this.StepAsync(_: CancellationToken) =
this.Value <- ZSet.neg a.Value
ValueTask.CompletedTask
Expand Down Expand Up @@ -108,6 +119,10 @@ type internal JoinZSetOp<'A, 'B, 'K, 'C
let inputs = [| a :> Op; b :> Op |]
override _.Name = "join"
override _.Inputs = inputs
/// Bilinear: (a₁+a₂) ⋈ b = (a₁ ⋈ b) + (a₂ ⋈ b), symmetric in b,
/// and 0 ⋈ b = a ⋈ 0 = 0. IncrementalAuto rewrites this to the
/// three-term form `Δa ⋈ Δb + z⁻¹(I(a)) ⋈ Δb + Δa ⋈ z⁻¹(I(b))`.
Comment on lines +123 to +124
override _.IsBilinear = true
override this.StepAsync(_: CancellationToken) =
this.Value <-
ZSet.join
Expand All @@ -125,6 +140,9 @@ type internal CartesianZSetOp<'A, 'B when 'A : comparison and 'B : comparison>
let inputs = [| a :> Op; b :> Op |]
override _.Name = "cartesian"
override _.Inputs = inputs
/// Bilinear: weights multiply (Checked.* in ZSet.cartesian); the
/// product distributes over Z-set addition in each argument.
override _.IsBilinear = true
override this.StepAsync(_: CancellationToken) =
this.Value <- ZSet.cartesian a.Value b.Value
ValueTask.CompletedTask
Expand Down Expand Up @@ -179,6 +197,10 @@ type internal IndexWithOp<'A, 'K, 'V
let inputs = [| input :> Op |]
override _.Name = "indexWith"
override _.Inputs = inputs
/// Linear: indexing distributes over Z-set addition because the
/// (key, value) extraction is weight-independent and the
/// per-key value groups sum via ZSet.add.
override _.IsLinear = true
override this.StepAsync(_: CancellationToken) =
this.Value <- IndexedZSet.indexWith key.Invoke value.Invoke input.Value
ValueTask.CompletedTask
Expand All @@ -194,6 +216,9 @@ type internal IndexedJoinOp<'K, 'VA, 'VB, 'C
let inputs = [| a :> Op; b :> Op |]
override _.Name = "indexedJoin"
override _.Inputs = inputs
/// Bilinear: per-key value-group cartesian; weights multiply
/// (Checked.* in IndexedZSet.join); distributes per-arg.
override _.IsBilinear = true
override this.StepAsync(_: CancellationToken) =
this.Value <-
IndexedZSet.join
Expand Down
62 changes: 57 additions & 5 deletions src/Core/PluginApi.fs
Original file line number Diff line number Diff line change
Expand Up @@ -95,41 +95,77 @@ type INestedFixpointParticipant =
abstract Fixedpoint : scope: int -> bool


// ─────────────────────────────────────────────────────────────────────
// Algebra capability tags — non-generic markers + typed interfaces.
//
// F# (and the CLR) cannot test `:? IBilinearOperator<obj, obj, 'TOut>`
// against a concrete `IBilinearOperator<'A, 'B, 'C>` instance because
// generic-interface type tests require exact type-parameter match.
// The fix is the BCL pattern used by `IEnumerable` vs `IEnumerable<T>`:
// a non-generic marker interface for runtime type tests, and a
// generic interface (inheriting the marker) for typed access.
//
// Plugin authors only need to implement the typed interface; the
// marker is satisfied automatically via interface inheritance. The
// scheduler / fusion engine / IncrementalAuto dispatcher tests
// against the marker, not the generic.
// ─────────────────────────────────────────────────────────────────────

/// Non-generic marker for `ILinearOperator<_, _>`. Used by
/// `PluginOperatorAdapter` and the scheduler for `:?` runtime tests
/// that don't need exact generic-parameter match.
type ILinearMarker = interface end

/// Non-generic marker for `IBilinearOperator<_, _, _>`.
type IBilinearMarker = interface end

/// Non-generic marker for `ISinkOperator<_, _>`.
type ISinkMarker = interface end

/// Non-generic marker for `IStatefulStrictOperator<_, _, _>`.
type IStatefulStrictMarker = interface end


/// Algebra capability: the operator is *linear* — `op(a + b) =
/// op(a) + op(b)` and `op(0) = 0`. Retraction-native: a
/// negative weight un-accumulates correctly. Declared at the
/// type level so the scheduler can run `LinearLaw` at
/// `Circuit.Build()`.
/// `Circuit.Build()` (test-time, via `LawRunner.checkLinear`).
type ILinearOperator<'TIn, 'TOut> =
Comment on lines 129 to 134
inherit IOperator<'TOut>
inherit ILinearMarker


/// Algebra capability: the operator is *bilinear* in two
/// inputs (e.g. a join). Incrementalisation generates the
/// standard `Δa ⋈ Δb + z^-1(I(a)) ⋈ Δb + Δa ⋈ z^-1(I(b))`
/// form.
/// form. Verified by `LawRunner.checkBilinear` (when available).
type IBilinearOperator<'TIn1, 'TIn2, 'TOut> =
inherit IOperator<'TOut>
inherit IBilinearMarker


/// Algebra capability: the operator is a *sink* — terminal,
/// non-Z-set-emitting, potentially retraction-lossy. Sink
/// operators are consciously exempt from relational
/// composition laws and the scheduler enforces terminal
/// placement (a sink may not feed another operator inside a
/// relational path). Bayesian aggregates are the canonical
/// example.
/// relational path) via the `Circuit.Build()` validation pass.
/// Bayesian aggregates are the canonical example.
type ISinkOperator<'TIn, 'TOut> =
inherit IOperator<'TOut>
inherit ISinkMarker


/// Algebra capability: the operator carries explicit stateful
/// strict semantics — init / step / retract triple. Distinct
/// from `IStrictOperator` (feedback-cut): stateful-strict ops
/// hold per-key or per-instance state that must retract
/// cleanly when a negative weight arrives.
/// cleanly when a negative weight arrives. Verified by
/// `LawRunner.checkRetractionCompleteness`.
type IStatefulStrictOperator<'TIn, 'TState, 'TOut> =
inherit IOperator<'TOut>
inherit IStatefulStrictMarker


/// Internal adapter: wraps an `IOperator<'T>` inside an
Expand Down Expand Up @@ -164,6 +200,17 @@ type internal PluginOperatorAdapter<'TOut>(plugin: IOperator<'TOut>, inputOps: O
| :? INestedFixpointParticipant as f -> Some f
| _ -> None

// Algebra capability detection via non-generic markers. The typed
// interfaces (`ILinearOperator<_,_>` etc.) inherit from these
// markers, so a plugin implementing `ILinearOperator<int, string>`
// satisfies `ILinearMarker` automatically. Cached once at
// construction; the adapter pays zero per-tick cost for capability
// surfacing.
let isLinearCap = (box plugin) :? ILinearMarker
let isBilinearCap = (box plugin) :? IBilinearMarker
let isSinkCap = (box plugin) :? ISinkMarker
let isStatefulStrictCap = (box plugin) :? IStatefulStrictMarker

member internal _.PublishCount = publishCount

override _.Name = plugin.Name
Expand All @@ -177,6 +224,11 @@ type internal PluginOperatorAdapter<'TOut>(plugin: IOperator<'TOut>, inputOps: O
| Some a -> a.IsAsync
| None -> false

override _.IsLinear = isLinearCap
override _.IsBilinear = isBilinearCap
override _.IsSink = isSinkCap
override _.IsStatefulStrict = isStatefulStrictCap

override this.StepAsync(ct: CancellationToken) : ValueTask =
let buffer = OutputBuffer<'TOut>(this, publishCount)
plugin.StepAsync(buffer, ct)
Expand Down
14 changes: 14 additions & 0 deletions src/Core/Primitive.fs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ type internal DelayOp<'T>(input: Op<'T>, initial: 'T) =
override _.Name = "z^-1"
override _.Inputs = inputs
override _.IsStrict = true
/// Linear: `z⁻¹` is a time-shift; it distributes over addition
/// trivially when `initial = 0` for the group. Callers passing a
/// non-zero initial are responsible for the resulting affine
/// offset — DBSP usage always passes the group zero.
override _.IsLinear = true
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Gate Delay linear tag on zero initial value

DelayOp is marked IsLinear = true unconditionally, but Delay(s, initial) is affine when initial is non-zero (delay(0) emits initial on the first tick). With IncrementalAuto, any query containing Delay with a non-empty initial Z-set can be routed down the linear fast path (Q^Δ = Q) and produce wrong deltas on the first tick. The capability should only be linear when the initial value is the group zero.

Useful? React with 👍 / 👎.

Comment on lines +17 to +21
override this.StepAsync(_: CancellationToken) =
this.Value <- state
ValueTask.CompletedTask
Expand All @@ -35,6 +40,11 @@ type internal IntegrateOp<'T>(input: Op<'T>, zero: 'T, add: Func<'T, 'T, 'T>) =
let mutable state = zero
override _.Name = "integrate"
override _.Inputs = inputs
/// Linear: `I` is the running sum operator; it commutes with the
/// group operation by associativity/commutativity of the supplied
/// `add` function. Caller is responsible for passing a true
/// group `(zero, add)` — Z-set's `(Empty, ZSet.add)` qualifies.
override _.IsLinear = true
// Integration is causal but NOT strict (per the DBSP paper, §2.18). The
// scheduler runs us in topological order *after* `input`, so at StepAsync
// time `input.Value` is the current tick's delta. We update state and
Expand All @@ -52,6 +62,10 @@ type internal DifferentiateOp<'T>(input: Op<'T>, zero: 'T, sub: Func<'T, 'T, 'T>
let mutable prev = zero
override _.Name = "differentiate"
override _.Inputs = inputs
/// Linear: `D` distributes over the group operation by linearity
/// of `sub` on an abelian group. Inverse of `IntegrateOp` (modulo
/// initial conditions).
override _.IsLinear = true
// IsStrict remains false — non-strict ops run in topo order so by the time
// we execute, the input operator's `Value` reflects the current tick.
override this.StepAsync(_: CancellationToken) =
Expand Down
Loading
Loading