Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 123 additions & 0 deletions src/Core/LawRunner.fs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,129 @@ module LawRunner =
tick <- tick + 1
result)

/// Bilinearity — three sub-properties, each per-tick:
///
/// L1 (left-linearity): op(a₁ + a₂, b) ≡ op(a₁, b) + op(a₂, b)
/// L2 (right-linearity): op(a, b₁ + b₂) ≡ op(a, b₁) + op(a, b₂)
/// L3 (sign-distribution): op(-a, b) ≡ -op(a, b)
///
/// **Math note on L3.** Over an abelian group with standard
/// addition (as `(+)` is for `int`, and as `ZSet.add` is for
/// `ZSet<'K>`), L1 + L2 *imply* L3: setting `a₁ = a, a₂ = -a`
/// in L1 gives `op(0, b) = op(a, b) + op(-a, b)`, so the
/// classical bilinear condition `op(0, b) = 0` collapses to L3.
/// In that regime L3 is the "first failure to fire" line of
/// defense — an affine offset like `op(a, b) = a*b + c` breaks
/// L1 (the constant lands once on LHS, twice on RHS) AND L3
/// (the constant survives negation), so L1 usually trips first
/// by check-order; L3 is the cleanup law.
///
/// L3 becomes load-bearing — not redundant — when the caller-
/// supplied `(addOut, negOut)` pair doesn't actually form an
/// abelian group: `negOut` might not be a true inverse of
/// `addOut`, the operations might not be associative or
/// commutative on `'TOut`, or there might be hidden state in
/// the supplied functions. In such cases L1 + L2 can pass while
/// L3 catches the broken algebra — and a failure here may
/// reflect the *supplied algebra operations* rather than the
/// operator under test. Checking all three sub-properties keeps
/// `checkBilinear` correct across the full range of `'TOut`
/// algebras a plugin author might supply, not just `int` /
/// `ZSet<_>` where the abelian-group assumption holds by
/// construction.
///
/// - `samples` — number of (A₁, A₂, B₁, B₂) quadruples.
/// - `scheduleLength` — ticks per trace. Each per-argument trace
/// uses one fresh draw per tick, so longer schedules exercise
/// stateful bilinear ops (e.g. windowed joins).
/// - `negIn1` — required for the L3 sign-distribution check on
/// the first argument. We test L3 only on the first argument
/// because L2 + L3-on-first + abelian-group structure imply
/// L3-on-second; the asymmetry is intentional, mirroring what
/// `IncrementalJoin` actually requires.
/// - `negOut` — needed to form `-op(a, b)` for the L3 equality.
let checkBilinear<'TIn1, 'TIn2, 'TOut>
(seed: int)
(samples: int)
(scheduleLength: int)
(makeOp: Stream<'TIn1> -> Stream<'TIn2> -> IOperator<'TOut>)
(genInput1: System.Random -> 'TIn1)
(genInput2: System.Random -> 'TIn2)
(addIn1: 'TIn1 -> 'TIn1 -> 'TIn1)
(negIn1: 'TIn1 -> 'TIn1)
(addIn2: 'TIn2 -> 'TIn2 -> 'TIn2)
(addOut: 'TOut -> 'TOut -> 'TOut)
(negOut: 'TOut -> 'TOut)
(equalOut: 'TOut -> 'TOut -> bool)
: Result<unit, LawViolation> =
if samples < 1 then
Error (badArgs seed "samples must be >= 1")
elif scheduleLength < 1 then
Error (badArgs seed "scheduleLength must be >= 1")
else
runSamples seed samples (fun rng i ->
// Four independent traces for the per-arg laws.
let traceA1 = generateTrace rng scheduleLength genInput1
let traceA2 = generateTrace rng scheduleLength genInput1
let traceB1 = generateTrace rng scheduleLength genInput2
let traceB2 = generateTrace rng scheduleLength genInput2

let traceASum = List.map2 addIn1 traceA1 traceA2
let traceBSum = List.map2 addIn2 traceB1 traceB2
let traceANeg = traceA1 |> List.map negIn1

// Run all the per-arg cases through the same operator
// factory. Each run gets a fresh op instance — the
// bilinear law applies to STATELESS bilinearity per
// tick; stateful bilinear ops need a stronger
// formulation (a separate law, not in scope here).
let outA1B1 = PluginHarness.runTwoInputs makeOp traceA1 traceB1 |> List.toArray
let outA2B1 = PluginHarness.runTwoInputs makeOp traceA2 traceB1 |> List.toArray
let outASumB1 = PluginHarness.runTwoInputs makeOp traceASum traceB1 |> List.toArray
let outA1B2 = PluginHarness.runTwoInputs makeOp traceA1 traceB2 |> List.toArray
let outA1BSum = PluginHarness.runTwoInputs makeOp traceA1 traceBSum |> List.toArray
let outANegB1 = PluginHarness.runTwoInputs makeOp traceANeg traceB1 |> List.toArray

let mutable result = None
let mutable tick = 0
while result.IsNone && tick < scheduleLength do
// L1: op(a₁+a₂, b) ≡ op(a₁, b) + op(a₂, b)
let l1Lhs = outASumB1.[tick]
let l1Rhs = addOut outA1B1.[tick] outA2B1.[tick]
if not (equalOut l1Lhs l1Rhs) then
result <-
Some (sprintf
"Left-linearity broke at sample %d, tick %d: \
op(a₁+a₂, b) != op(a₁, b) + op(a₂, b)."
i tick)
else
// L2: op(a, b₁+b₂) ≡ op(a, b₁) + op(a, b₂)
let l2Lhs = outA1BSum.[tick]
let l2Rhs = addOut outA1B1.[tick] outA1B2.[tick]
if not (equalOut l2Lhs l2Rhs) then
result <-
Some (sprintf
"Right-linearity broke at sample %d, tick %d: \
op(a, b₁+b₂) != op(a, b₁) + op(a, b₂)."
i tick)
else
// L3: op(-a, b) ≡ -op(a, b)
let l3Lhs = outANegB1.[tick]
let l3Rhs = negOut outA1B1.[tick]
if not (equalOut l3Lhs l3Rhs) then
result <-
Some (sprintf
"Sign-distribution broke at sample %d, tick %d: \
op(-a, b) != -op(a, b). This is the failure \
mode where a plugin claims IBilinearOperator \
but smuggles an additive offset; the three-term \
incremental-join rewrite will produce \
wrong-but-quiet results under retraction."
i tick)
// nosemgrep: plain-tick-increment -- method-local loop counter, not shared across threads
tick <- tick + 1
result)

/// Retraction completeness via state restoration.
/// A forward trace of random Z-sets is cancelled by its
/// elementwise negation; a continuation trace is then fed
Expand Down
72 changes: 72 additions & 0 deletions src/Core/PluginHarness.fs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,78 @@ module PluginHarness =
"PluginHarness: tick %d expected exactly one Publish; saw %d."
tick delta)
outputs.Add adapter.Value
// Strict-op post-step hook — see runTwoInputs for the
// rationale; same fix applies symmetrically to single-
// input strict plugins (e.g. IStrictOperator-tagged
// ops exercised via LawRunner.checkLinear).
let postVt = (adapter :> Op).AfterStepAsync ct
if not postVt.IsCompletedSuccessfully then
postVt.AsTask().GetAwaiter().GetResult()
// nosemgrep: plain-tick-increment -- method-local loop counter, not shared across threads
tick <- tick + 1
List.ofSeq outputs

/// Drive a two-input plugin operator through a pair of parallel
/// input sequences. Each tick advances BOTH inputs in lock-step;
/// the shorter sequence determines the run length (`Seq.zip`
/// semantics).
///
/// Used by `LawRunner.checkBilinear` to exercise per-argument
/// linearity and sign-distribution. Same exactly-one-`Publish`
/// assertion as `runSingleInput`.
let runTwoInputs<'TIn1, 'TIn2, 'TOut>
(makeOp: Stream<'TIn1> -> Stream<'TIn2> -> IOperator<'TOut>)
(inputs1: seq<'TIn1>)
(inputs2: seq<'TIn2>)
: 'TOut list =
let source1 = HarnessSourceOp<'TIn1>()
let source2 = HarnessSourceOp<'TIn2>()
// Synthetic ids — sources at 0+1, adapter at 2. Real circuits
// assign these during `Circuit.Build`; the harness mirrors
// the layout so the adapter's per-tick state semantics match
// production exactly.
source1.idField <- 0
source2.idField <- 1
let stream1 = Stream<'TIn1>(source1)
let stream2 = Stream<'TIn2>(source2)
let plugin = makeOp stream1 stream2

let inputOps : Op array =
plugin.ReadDependencies
|> Array.map (fun h -> h.op)
let adapter = PluginOperatorAdapter<'TOut>(plugin, inputOps)
adapter.idField <- 2

let outputs = ResizeArray<'TOut>()
let ct = CancellationToken.None
let mutable tick = 0
let zipped = Seq.zip inputs1 inputs2
for (in1, in2) in zipped do
source1.Feed in1
source2.Feed in2
let before = adapter.PublishCount.Value
let vt = (adapter :> Op).StepAsync ct
if not vt.IsCompletedSuccessfully then
vt.AsTask().GetAwaiter().GetResult()
Comment thread
AceHack marked this conversation as resolved.
let after = adapter.PublishCount.Value
let delta = after - before
if delta <> 1 then
invalidOp (
sprintf
"PluginHarness: tick %d expected exactly one Publish; saw %d."
tick delta)
outputs.Add adapter.Value
// Mirror Circuit.Step/StepAsync's strict-op post-step
// hook: after StepAsync completes, invoke AfterStepAsync
// so strict bilinear/stateful ops commit per-tick state
// before the next tick begins. Without this, plugins
// implementing IStrictOperator would see different
// semantics in `runTwoInputs` than in real circuit
// execution, and `LawRunner.checkBilinear` would
// validate against incorrect strict-op state.
let postVt = (adapter :> Op).AfterStepAsync ct
if not postVt.IsCompletedSuccessfully then
postVt.AsTask().GetAwaiter().GetResult()
// nosemgrep: plain-tick-increment -- method-local loop counter, not shared across threads
tick <- tick + 1
List.ofSeq outputs
Loading
Loading