diff --git a/apps/web/STYLE_GUIDE.md b/apps/web/STYLE_GUIDE.md index aceb4e4c675..d773439523b 100644 --- a/apps/web/STYLE_GUIDE.md +++ b/apps/web/STYLE_GUIDE.md @@ -364,6 +364,48 @@ const messageCount = messages.length; const COMPACTION_CIRCUIT_OPEN_MS = 30_000; ``` +### No migration-history comments + +Comments and docstrings describe **what the code is and does now**, not +what it replaced, the bug that motivated it, or how it used to be +written. Migration narrative belongs in the PR description and commit +message — it rots immediately and adds nothing for a future reader who +never saw the prior code. + +```ts +// Avoid — migration narrative; useless once the prior code is gone +/** + * Zustand store for stream lifecycle. Codifies what was previously an + * implicit state machine spread across use-event-stream.ts, + * use-message-reconciliation.ts, and use-stream-event-handler.ts — + * together with the four shared refs those hooks mutated. + */ + +// Avoid — references files/refs that no longer exist +/** + * Monotonic counter bumped on every open attempt. Replaces the old + * `streamEpochRef`. + */ +epoch: number; + +// Avoid — comments that frame current behavior as "the fix for X" +// The pre-existing bug was a 1s clock-based dedup race; here the +// state alone gates the transition. + +// Good — describes the current invariant +/** + * Monotonic counter bumped on every open attempt. Stream callbacks + * tag themselves with the epoch at open time so stale `OPEN_SUCCESS` + * / `OPEN_FAILURE` events (e.g. a slow fetch that resolves after the + * caller has moved on) are dropped by the reducer. + */ +epoch: number; +``` + +Same rule for test comments: explain the behavior the test asserts in +terms of the current code, not the bug it was originally written to +catch. + --- ## Formatting diff --git a/apps/web/src/domains/chat/stream-lifecycle-store.test.ts b/apps/web/src/domains/chat/stream-lifecycle-store.test.ts new file mode 100644 index 00000000000..92f0a33d13e --- /dev/null +++ b/apps/web/src/domains/chat/stream-lifecycle-store.test.ts @@ -0,0 +1,734 @@ +import { describe, expect, test } from "bun:test"; + +import { + INITIAL_STREAM_LIFECYCLE_STATE, + isStreamConnecting, + isStreamOpen, + isStreamPaused, + streamLifecycleReducer, + type DomainEvent, + type StreamContext, + type StreamLifecycleState, +} from "@/domains/chat/stream-lifecycle-store.js"; + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +/** Apply a sequence of events to a state, returning the final state. */ +function applyEvents( + state: StreamLifecycleState, + events: DomainEvent[], +): StreamLifecycleState { + return events.reduce(streamLifecycleReducer, state); +} + +const ctxA: StreamContext = { + assistantId: "asst-a", + conversationKey: "conv-1", +}; + +const ctxB: StreamContext = { + assistantId: "asst-a", + conversationKey: "conv-2", +}; + +/** Walk from `closed` to a target phase using minimum events. */ +function openedState(context: StreamContext = ctxA): StreamLifecycleState { + return applyEvents(INITIAL_STREAM_LIFECYCLE_STATE, [ + { type: "OPEN_REQUEST", context }, + { type: "OPEN_SUCCESS", epoch: 1 }, + ]); +} + +function openingState(context: StreamContext = ctxA): StreamLifecycleState { + return streamLifecycleReducer(INITIAL_STREAM_LIFECYCLE_STATE, { + type: "OPEN_REQUEST", + context, + }); +} + +function retryingState(context: StreamContext = ctxA): StreamLifecycleState { + return applyEvents(INITIAL_STREAM_LIFECYCLE_STATE, [ + { type: "OPEN_REQUEST", context }, + { type: "OPEN_FAILURE", epoch: 1, message: "boom" }, + ]); +} + +function waitingState(context: StreamContext = ctxA): StreamLifecycleState { + return applyEvents(INITIAL_STREAM_LIFECYCLE_STATE, [ + { type: "OPEN_REQUEST", context }, + { type: "OPEN_SUCCESS", epoch: 1 }, + { type: "APP_LIFECYCLE_CHANGE", source: "visibility", online: false }, + ]); +} + +function reconcilingState(context: StreamContext = ctxA): StreamLifecycleState { + return applyEvents(INITIAL_STREAM_LIFECYCLE_STATE, [ + { type: "OPEN_REQUEST", context }, + { type: "OPEN_SUCCESS", epoch: 1 }, + { type: "RECONCILE_REQUEST" }, + ]); +} + +// --------------------------------------------------------------------------- +// Initial state +// --------------------------------------------------------------------------- + +describe("INITIAL_STREAM_LIFECYCLE_STATE", () => { + test("starts closed with no context, no failures, epoch 0", () => { + expect(INITIAL_STREAM_LIFECYCLE_STATE.phase).toBe("closed"); + expect(INITIAL_STREAM_LIFECYCLE_STATE.epoch).toBe(0); + expect(INITIAL_STREAM_LIFECYCLE_STATE.consecutiveFailures).toBe(0); + expect(INITIAL_STREAM_LIFECYCLE_STATE.context).toBeNull(); + expect(INITIAL_STREAM_LIFECYCLE_STATE.lastError).toBeNull(); + }); + + test("derived predicates report inactive lifecycle", () => { + expect(isStreamOpen(INITIAL_STREAM_LIFECYCLE_STATE)).toBe(false); + expect(isStreamConnecting(INITIAL_STREAM_LIFECYCLE_STATE)).toBe(false); + expect(isStreamPaused(INITIAL_STREAM_LIFECYCLE_STATE)).toBe(false); + }); +}); + +// --------------------------------------------------------------------------- +// Derived helpers +// --------------------------------------------------------------------------- + +describe("derived helpers", () => { + test("isStreamOpen is true only in 'open'", () => { + expect(isStreamOpen(openedState())).toBe(true); + expect(isStreamOpen(openingState())).toBe(false); + expect(isStreamOpen(retryingState())).toBe(false); + expect(isStreamOpen(waitingState())).toBe(false); + expect(isStreamOpen(reconcilingState())).toBe(false); + }); + + test("isStreamConnecting is true in 'opening' and 'reconciling'", () => { + expect(isStreamConnecting(openingState())).toBe(true); + expect(isStreamConnecting(reconcilingState())).toBe(true); + expect(isStreamConnecting(openedState())).toBe(false); + expect(isStreamConnecting(retryingState())).toBe(false); + expect(isStreamConnecting(waitingState())).toBe(false); + }); + + test("isStreamPaused is true in 'waiting' and 'retrying'", () => { + expect(isStreamPaused(waitingState())).toBe(true); + expect(isStreamPaused(retryingState())).toBe(true); + expect(isStreamPaused(openedState())).toBe(false); + expect(isStreamPaused(openingState())).toBe(false); + expect(isStreamPaused(reconcilingState())).toBe(false); + }); +}); + +// --------------------------------------------------------------------------- +// OPEN_REQUEST +// --------------------------------------------------------------------------- + +describe("OPEN_REQUEST", () => { + test("transitions closed → opening, bumping epoch and setting context", () => { + const state = streamLifecycleReducer(INITIAL_STREAM_LIFECYCLE_STATE, { + type: "OPEN_REQUEST", + context: ctxA, + }); + expect(state.phase).toBe("opening"); + expect(state.epoch).toBe(1); + expect(state.context).toEqual(ctxA); + expect(state.lastError).toBeNull(); + }); + + test("clears lastError carried over from a prior failure", () => { + const erroredFromRetry: StreamLifecycleState = { + ...retryingState(), + }; + expect(erroredFromRetry.lastError).toBe("boom"); + const next = streamLifecycleReducer(erroredFromRetry, { + type: "OPEN_REQUEST", + context: ctxA, + }); + expect(next.phase).toBe("opening"); + expect(next.lastError).toBeNull(); + }); + + test("dedups same-context request while opening", () => { + const opening = openingState(ctxA); + const next = streamLifecycleReducer(opening, { + type: "OPEN_REQUEST", + context: { ...ctxA }, + }); + expect(next).toBe(opening); + }); + + test("dedups same-context request while open", () => { + const open = openedState(ctxA); + const next = streamLifecycleReducer(open, { + type: "OPEN_REQUEST", + context: { ...ctxA }, + }); + expect(next).toBe(open); + }); + + test("conversation switch while open re-opens with bumped epoch", () => { + const open = openedState(ctxA); + const next = streamLifecycleReducer(open, { + type: "OPEN_REQUEST", + context: ctxB, + }); + expect(next.phase).toBe("opening"); + expect(next.epoch).toBe(open.epoch + 1); + expect(next.context).toEqual(ctxB); + }); + + test("conversation switch while opening replaces context and bumps epoch", () => { + const opening = openingState(ctxA); + const next = streamLifecycleReducer(opening, { + type: "OPEN_REQUEST", + context: ctxB, + }); + expect(next.phase).toBe("opening"); + expect(next.epoch).toBe(opening.epoch + 1); + expect(next.context).toEqual(ctxB); + }); + + test("from retrying: manual retry transitions to opening and clears lastError", () => { + const retrying = retryingState(); + const next = streamLifecycleReducer(retrying, { + type: "OPEN_REQUEST", + context: ctxA, + }); + expect(next.phase).toBe("opening"); + expect(next.epoch).toBe(retrying.epoch + 1); + expect(next.lastError).toBeNull(); + expect(next.consecutiveFailures).toBe(retrying.consecutiveFailures); + }); + + test("from waiting: explicit open transitions to opening", () => { + const waiting = waitingState(); + const next = streamLifecycleReducer(waiting, { + type: "OPEN_REQUEST", + context: ctxA, + }); + expect(next.phase).toBe("opening"); + expect(next.epoch).toBe(waiting.epoch + 1); + }); + + test("from reconciling: OPEN_REQUEST overrides pending reconcile and transitions to opening", () => { + const reconciling = reconcilingState(); + const next = streamLifecycleReducer(reconciling, { + type: "OPEN_REQUEST", + context: ctxA, + }); + // The dedup guard only fires for `opening` / `open`, so reconciling + // falls through and we transition to `opening` (epoch bumped). An + // explicit open overrides any pending reconcile. + expect(next.phase).toBe("opening"); + expect(next.epoch).toBe(reconciling.epoch + 1); + }); +}); + +// --------------------------------------------------------------------------- +// OPEN_SUCCESS +// --------------------------------------------------------------------------- + +describe("OPEN_SUCCESS", () => { + test("opening → open and resets consecutiveFailures", () => { + const opening = streamLifecycleReducer( + { ...INITIAL_STREAM_LIFECYCLE_STATE, consecutiveFailures: 4 }, + { type: "OPEN_REQUEST", context: ctxA }, + ); + expect(opening.consecutiveFailures).toBe(4); + const next = streamLifecycleReducer(opening, { + type: "OPEN_SUCCESS", + epoch: opening.epoch, + }); + expect(next.phase).toBe("open"); + expect(next.consecutiveFailures).toBe(0); + expect(next.lastError).toBeNull(); + }); + + test("ignores stale epoch (callback from a prior open attempt)", () => { + const opening = openingState(); + const next = streamLifecycleReducer(opening, { + type: "OPEN_SUCCESS", + epoch: opening.epoch - 1, + }); + expect(next).toBe(opening); + }); + + test("no-op when not in opening phase", () => { + const open = openedState(); + const next = streamLifecycleReducer(open, { + type: "OPEN_SUCCESS", + epoch: open.epoch, + }); + expect(next).toBe(open); + }); +}); + +// --------------------------------------------------------------------------- +// OPEN_FAILURE +// --------------------------------------------------------------------------- + +describe("OPEN_FAILURE", () => { + test("opening → retrying, increments consecutiveFailures, records error", () => { + const opening = openingState(); + const next = streamLifecycleReducer(opening, { + type: "OPEN_FAILURE", + epoch: opening.epoch, + message: "fetch failed", + }); + expect(next.phase).toBe("retrying"); + expect(next.consecutiveFailures).toBe(1); + expect(next.lastError).toBe("fetch failed"); + }); + + test("open → retrying when stream errors mid-flight", () => { + const open = openedState(); + const next = streamLifecycleReducer(open, { + type: "OPEN_FAILURE", + epoch: open.epoch, + message: "stream ended", + }); + expect(next.phase).toBe("retrying"); + expect(next.consecutiveFailures).toBe(1); + expect(next.lastError).toBe("stream ended"); + }); + + test("accumulates consecutiveFailures across repeated failures", () => { + const final = applyEvents(INITIAL_STREAM_LIFECYCLE_STATE, [ + { type: "OPEN_REQUEST", context: ctxA }, + { type: "OPEN_FAILURE", epoch: 1, message: "first" }, + { type: "OPEN_REQUEST", context: ctxA }, + { type: "OPEN_FAILURE", epoch: 2, message: "second" }, + { type: "OPEN_REQUEST", context: ctxA }, + { type: "OPEN_FAILURE", epoch: 3, message: "third" }, + ]); + expect(final.phase).toBe("retrying"); + expect(final.consecutiveFailures).toBe(3); + expect(final.lastError).toBe("third"); + }); + + test("ignores stale epoch", () => { + const opening = openingState(); + const next = streamLifecycleReducer(opening, { + type: "OPEN_FAILURE", + epoch: opening.epoch - 1, + message: "stale", + }); + expect(next).toBe(opening); + }); + + test("no-op from closed/waiting/retrying/reconciling", () => { + for (const state of [ + INITIAL_STREAM_LIFECYCLE_STATE, + waitingState(), + retryingState(), + reconcilingState(), + ]) { + const next = streamLifecycleReducer(state, { + type: "OPEN_FAILURE", + epoch: state.epoch, + message: "shouldn't apply", + }); + expect(next).toBe(state); + } + }); +}); + +// --------------------------------------------------------------------------- +// RECONCILE_REQUEST +// --------------------------------------------------------------------------- + +describe("RECONCILE_REQUEST", () => { + test("open → reconciling (canonical pending-edits-before-reopen path)", () => { + const open = openedState(); + const next = streamLifecycleReducer(open, { type: "RECONCILE_REQUEST" }); + expect(next.phase).toBe("reconciling"); + // Epoch is NOT bumped here — that happens on RECONCILE_SUCCESS so + // the new open attempt gets its own clean epoch. + expect(next.epoch).toBe(open.epoch); + }); + + test("opening → reconciling", () => { + const opening = openingState(); + const next = streamLifecycleReducer(opening, { type: "RECONCILE_REQUEST" }); + expect(next.phase).toBe("reconciling"); + }); + + test("waiting → reconciling", () => { + const waiting = waitingState(); + const next = streamLifecycleReducer(waiting, { type: "RECONCILE_REQUEST" }); + expect(next.phase).toBe("reconciling"); + }); + + test("retrying → reconciling", () => { + const retrying = retryingState(); + const next = streamLifecycleReducer(retrying, { + type: "RECONCILE_REQUEST", + }); + expect(next.phase).toBe("reconciling"); + }); + + test("closed: no-op (nothing pending, nothing to reopen)", () => { + const next = streamLifecycleReducer(INITIAL_STREAM_LIFECYCLE_STATE, { + type: "RECONCILE_REQUEST", + }); + expect(next).toBe(INITIAL_STREAM_LIFECYCLE_STATE); + }); + + test("reconciling: no-op (already reconciling)", () => { + const reconciling = reconcilingState(); + const next = streamLifecycleReducer(reconciling, { + type: "RECONCILE_REQUEST", + }); + expect(next).toBe(reconciling); + }); +}); + +// --------------------------------------------------------------------------- +// RECONCILE_SUCCESS +// --------------------------------------------------------------------------- + +describe("RECONCILE_SUCCESS", () => { + test("reconciling → opening with a fresh epoch", () => { + const reconciling = reconcilingState(); + const next = streamLifecycleReducer(reconciling, { + type: "RECONCILE_SUCCESS", + }); + expect(next.phase).toBe("opening"); + expect(next.epoch).toBe(reconciling.epoch + 1); + expect(next.lastError).toBeNull(); + }); + + test("no-op from any phase that isn't reconciling", () => { + for (const state of [ + INITIAL_STREAM_LIFECYCLE_STATE, + openingState(), + openedState(), + waitingState(), + retryingState(), + ]) { + const next = streamLifecycleReducer(state, { + type: "RECONCILE_SUCCESS", + }); + expect(next).toBe(state); + } + }); +}); + +// --------------------------------------------------------------------------- +// RECONCILE_FAILURE +// --------------------------------------------------------------------------- + +describe("RECONCILE_FAILURE", () => { + test("reconciling → retrying, increments failures, records error", () => { + const reconciling = reconcilingState(); + const baseFailures = reconciling.consecutiveFailures; + const next = streamLifecycleReducer(reconciling, { + type: "RECONCILE_FAILURE", + message: "reconcile fetch errored", + }); + expect(next.phase).toBe("retrying"); + expect(next.consecutiveFailures).toBe(baseFailures + 1); + expect(next.lastError).toBe("reconcile fetch errored"); + }); + + test("no-op from any phase that isn't reconciling", () => { + for (const state of [ + INITIAL_STREAM_LIFECYCLE_STATE, + openingState(), + openedState(), + waitingState(), + retryingState(), + ]) { + const next = streamLifecycleReducer(state, { + type: "RECONCILE_FAILURE", + message: "shouldn't apply", + }); + expect(next).toBe(state); + } + }); +}); + +// --------------------------------------------------------------------------- +// CLOSE_REQUEST +// --------------------------------------------------------------------------- + +describe("CLOSE_REQUEST", () => { + test("from any non-closed phase transitions to closed and clears context", () => { + for (const state of [ + openingState(), + openedState(), + waitingState(), + retryingState(), + reconcilingState(), + ]) { + const next = streamLifecycleReducer(state, { + type: "CLOSE_REQUEST", + source: "unmount", + }); + expect(next.phase).toBe("closed"); + expect(next.context).toBeNull(); + } + }); + + test("close from closed remains closed (idempotent)", () => { + const next = streamLifecycleReducer(INITIAL_STREAM_LIFECYCLE_STATE, { + type: "CLOSE_REQUEST", + source: "unmount", + }); + expect(next.phase).toBe("closed"); + expect(next.context).toBeNull(); + }); + + test("preserves consecutiveFailures so a later retry can use the counter", () => { + const retrying = retryingState(); + expect(retrying.consecutiveFailures).toBe(1); + const next = streamLifecycleReducer(retrying, { + type: "CLOSE_REQUEST", + source: "deps_changed", + }); + expect(next.consecutiveFailures).toBe(1); + }); +}); + +// --------------------------------------------------------------------------- +// APP_LIFECYCLE_CHANGE — going offline / backgrounded +// --------------------------------------------------------------------------- + +describe("APP_LIFECYCLE_CHANGE (online=false)", () => { + test("open → waiting", () => { + const open = openedState(); + const next = streamLifecycleReducer(open, { + type: "APP_LIFECYCLE_CHANGE", + source: "visibility", + online: false, + }); + expect(next.phase).toBe("waiting"); + expect(next.epoch).toBe(open.epoch); + }); + + test("opening → waiting, bumps epoch so in-flight callbacks become stale", () => { + const opening = openingState(); + const next = streamLifecycleReducer(opening, { + type: "APP_LIFECYCLE_CHANGE", + source: "app_state", + online: false, + }); + expect(next.phase).toBe("waiting"); + expect(next.epoch).toBe(opening.epoch + 1); + }); + + test("dedups on already-paused phases (waiting, retrying)", () => { + for (const state of [waitingState(), retryingState()]) { + const next = streamLifecycleReducer(state, { + type: "APP_LIFECYCLE_CHANGE", + source: "reachability", + online: false, + }); + expect(next).toBe(state); + } + }); + + test("dedups from closed and reconciling", () => { + for (const state of [INITIAL_STREAM_LIFECYCLE_STATE, reconcilingState()]) { + const next = streamLifecycleReducer(state, { + type: "APP_LIFECYCLE_CHANGE", + source: "visibility", + online: false, + }); + expect(next).toBe(state); + } + }); + + test("dedup across sources: a second offline signal within the same paused phase is a no-op", () => { + // `visibilitychange` and Capacitor `appStateChange` can both fire on + // background; phase-gating ensures only the first transition lands. + const afterFirst = streamLifecycleReducer(openedState(), { + type: "APP_LIFECYCLE_CHANGE", + source: "visibility", + online: false, + }); + const afterSecond = streamLifecycleReducer(afterFirst, { + type: "APP_LIFECYCLE_CHANGE", + source: "app_state", + online: false, + }); + expect(afterSecond).toBe(afterFirst); + }); +}); + +// --------------------------------------------------------------------------- +// APP_LIFECYCLE_CHANGE — coming online / foregrounded +// --------------------------------------------------------------------------- + +describe("APP_LIFECYCLE_CHANGE (online=true)", () => { + test("waiting → reconciling (resume reconciles before reopening)", () => { + const waiting = waitingState(); + const next = streamLifecycleReducer(waiting, { + type: "APP_LIFECYCLE_CHANGE", + source: "visibility", + online: true, + }); + expect(next.phase).toBe("reconciling"); + }); + + test("retrying → reconciling on a reachability flip", () => { + const retrying = retryingState(); + const next = streamLifecycleReducer(retrying, { + type: "APP_LIFECYCLE_CHANGE", + source: "reachability", + online: true, + }); + expect(next.phase).toBe("reconciling"); + }); + + test("no-op while already opening", () => { + const opening = openingState(); + const next = streamLifecycleReducer(opening, { + type: "APP_LIFECYCLE_CHANGE", + source: "visibility", + online: true, + }); + expect(next).toBe(opening); + }); + + test("no-op while already open", () => { + const open = openedState(); + const next = streamLifecycleReducer(open, { + type: "APP_LIFECYCLE_CHANGE", + source: "visibility", + online: true, + }); + expect(next).toBe(open); + }); + + test("no-op while reconciling", () => { + const reconciling = reconcilingState(); + const next = streamLifecycleReducer(reconciling, { + type: "APP_LIFECYCLE_CHANGE", + source: "visibility", + online: true, + }); + expect(next).toBe(reconciling); + }); + + test("no-op from closed (an explicit OPEN_REQUEST is required to start)", () => { + const next = streamLifecycleReducer(INITIAL_STREAM_LIFECYCLE_STATE, { + type: "APP_LIFECYCLE_CHANGE", + source: "visibility", + online: true, + }); + expect(next).toBe(INITIAL_STREAM_LIFECYCLE_STATE); + }); +}); + +// --------------------------------------------------------------------------- +// Canonical event sequences (end-to-end transition coverage) +// --------------------------------------------------------------------------- + +describe("canonical sequences", () => { + test("happy path: closed → opening → open", () => { + const final = applyEvents(INITIAL_STREAM_LIFECYCLE_STATE, [ + { type: "OPEN_REQUEST", context: ctxA }, + { type: "OPEN_SUCCESS", epoch: 1 }, + ]); + expect(final.phase).toBe("open"); + expect(final.epoch).toBe(1); + expect(final.consecutiveFailures).toBe(0); + expect(final.context).toEqual(ctxA); + }); + + test("transient failure then recovery: open → retrying → reconciling → opening → open", () => { + const final = applyEvents(INITIAL_STREAM_LIFECYCLE_STATE, [ + { type: "OPEN_REQUEST", context: ctxA }, + { type: "OPEN_SUCCESS", epoch: 1 }, + { type: "OPEN_FAILURE", epoch: 1, message: "network blip" }, + { type: "APP_LIFECYCLE_CHANGE", source: "reachability", online: true }, + { type: "RECONCILE_SUCCESS" }, + { type: "OPEN_SUCCESS", epoch: 2 }, + ]); + expect(final.phase).toBe("open"); + expect(final.consecutiveFailures).toBe(0); + expect(final.lastError).toBeNull(); + }); + + test("backgrounded then resumed: open → waiting → reconciling → opening → open", () => { + const final = applyEvents(INITIAL_STREAM_LIFECYCLE_STATE, [ + { type: "OPEN_REQUEST", context: ctxA }, + { type: "OPEN_SUCCESS", epoch: 1 }, + { type: "APP_LIFECYCLE_CHANGE", source: "app_state", online: false }, + { type: "APP_LIFECYCLE_CHANGE", source: "app_state", online: true }, + { type: "RECONCILE_SUCCESS" }, + { type: "OPEN_SUCCESS", epoch: 2 }, + ]); + expect(final.phase).toBe("open"); + expect(final.epoch).toBe(2); + }); + + test("conversationExistsOnServer flips: reconcile fires before reopen", () => { + // When the conversation transitions to server-persisted mid-stream + // we have to flush pending local edits before reopening. The state + // machine enforces RECONCILE → opening ordering at the phase level. + const final = applyEvents(INITIAL_STREAM_LIFECYCLE_STATE, [ + { type: "OPEN_REQUEST", context: ctxA }, + { type: "OPEN_SUCCESS", epoch: 1 }, + { type: "RECONCILE_REQUEST" }, + { type: "RECONCILE_SUCCESS" }, + { type: "OPEN_SUCCESS", epoch: 2 }, + ]); + expect(final.phase).toBe("open"); + expect(final.epoch).toBe(2); + }); + + test("stale OPEN_SUCCESS arriving after a teardown is ignored", () => { + const final = applyEvents(INITIAL_STREAM_LIFECYCLE_STATE, [ + { type: "OPEN_REQUEST", context: ctxA }, + // Teardown while opening (page hidden) bumps epoch. + { type: "APP_LIFECYCLE_CHANGE", source: "visibility", online: false }, + // The original open's fetch finally resolves — but with the + // pre-teardown epoch. The reducer must ignore it. + { type: "OPEN_SUCCESS", epoch: 1 }, + ]); + expect(final.phase).toBe("waiting"); + }); + + test("backoff counter accumulates across failures, resets on success", () => { + const final = applyEvents(INITIAL_STREAM_LIFECYCLE_STATE, [ + { type: "OPEN_REQUEST", context: ctxA }, + { type: "OPEN_FAILURE", epoch: 1, message: "1" }, + { type: "OPEN_REQUEST", context: ctxA }, + { type: "OPEN_FAILURE", epoch: 2, message: "2" }, + { type: "OPEN_REQUEST", context: ctxA }, + { type: "OPEN_SUCCESS", epoch: 3 }, + ]); + expect(final.phase).toBe("open"); + expect(final.consecutiveFailures).toBe(0); + expect(final.lastError).toBeNull(); + }); + + test("conversation switch while open: re-opens with new context", () => { + const final = applyEvents(INITIAL_STREAM_LIFECYCLE_STATE, [ + { type: "OPEN_REQUEST", context: ctxA }, + { type: "OPEN_SUCCESS", epoch: 1 }, + { type: "OPEN_REQUEST", context: ctxB }, + { type: "OPEN_SUCCESS", epoch: 2 }, + ]); + expect(final.phase).toBe("open"); + expect(final.context).toEqual(ctxB); + expect(final.epoch).toBe(2); + }); + + test("unmount during reconciling: lands in closed regardless", () => { + const final = applyEvents(INITIAL_STREAM_LIFECYCLE_STATE, [ + { type: "OPEN_REQUEST", context: ctxA }, + { type: "OPEN_SUCCESS", epoch: 1 }, + { type: "RECONCILE_REQUEST" }, + { type: "CLOSE_REQUEST", source: "unmount" }, + // Reconcile resolves after unmount — must be a no-op. + { type: "RECONCILE_SUCCESS" }, + ]); + expect(final.phase).toBe("closed"); + expect(final.context).toBeNull(); + }); +}); diff --git a/apps/web/src/domains/chat/stream-lifecycle-store.ts b/apps/web/src/domains/chat/stream-lifecycle-store.ts new file mode 100644 index 00000000000..e76df85c0d3 --- /dev/null +++ b/apps/web/src/domains/chat/stream-lifecycle-store.ts @@ -0,0 +1,428 @@ +/** + * Zustand store for the chat stream's network/state lifecycle. Owns the + * phase, the open-attempt epoch, the consecutive-failure counter, the + * active stream's conversation context, and the last error message. + * + * Phases: + * + * - `closed` — no stream, no fetch in flight, no intent to reopen + * - `opening` — open fetch is in flight + * - `open` — stream is established and receiving events + * - `waiting` — stream is torn down pending an app-lifecycle resume + * (page hidden, app backgrounded, network offline) + * - `reconciling`— a reconcile fetch must complete before the next open + * (precursor to reopen; ordering enforced by the reducer) + * - `retrying` — last open or reconcile failed; awaiting a reachability + * signal or a manual retry before re-attempting + * + * Domain events follow the [Flux-inspired + * practice](https://zustand.docs.pmnd.rs/learn/guides/flux-inspired-practice) + * naming convention: `on*` for system/SSE reactions, imperative for + * user/system-initiated transitions. The store exposes direct named + * actions per CONVENTIONS.md — no dispatcher. A pure + * `streamLifecycleReducer` is exported alongside for unit-testable state + * transitions; tests exercise the reducer, not the store. + * + * Wrapped with `createSelectors` so consumers read with atomic + * `useStreamLifecycleStore.use.phase()` selectors; non-React code reads + * synchronously via `useStreamLifecycleStore.getState()`. + * + * @see {@link https://zustand.docs.pmnd.rs/} + * @see {@link https://zustand.docs.pmnd.rs/learn/guides/flux-inspired-practice} + * @see {@link https://zustand.docs.pmnd.rs/learn/guides/auto-generating-selectors} + */ + +import { create } from "zustand"; + +import { createSelectors } from "@/utils/create-selectors.js"; + +// --------------------------------------------------------------------------- +// State +// --------------------------------------------------------------------------- + +export type StreamLifecyclePhase = + | "closed" + | "opening" + | "open" + | "waiting" + | "reconciling" + | "retrying"; + +export interface StreamContext { + assistantId: string; + conversationKey: string; +} + +export interface StreamLifecycleState { + phase: StreamLifecyclePhase; + /** + * Monotonic counter bumped on every transition that starts a new open + * attempt (`closed | waiting | retrying | reconciling | open(diff-ctx) → + * opening`). Stream callbacks tag themselves with the epoch at open + * time so the reducer can ignore `OPEN_SUCCESS` / `OPEN_FAILURE` events + * whose epoch is stale (e.g. a slow fetch that resolves after the + * caller has already moved on). + */ + epoch: number; + /** + * Number of consecutive open or reconcile failures. Resets to zero on + * `OPEN_SUCCESS`. The hook layer reads this to compute backoff; the + * reducer only tracks the counter. + */ + consecutiveFailures: number; + /** + * Identity of the conversation the stream is (or was last) bound to. + * Cleared on `CLOSE_REQUEST`. Used by the hook to decide whether an + * inbound `OPEN_REQUEST` is a conversation switch (different context) + * or a redundant re-open (same context). + */ + context: StreamContext | null; + /** Last error message reported via `OPEN_FAILURE` or `RECONCILE_FAILURE`. */ + lastError: string | null; +} + +export const INITIAL_STREAM_LIFECYCLE_STATE: StreamLifecycleState = { + phase: "closed", + epoch: 0, + consecutiveFailures: 0, + context: null, + lastError: null, +}; + +// --------------------------------------------------------------------------- +// Derived helpers +// --------------------------------------------------------------------------- + +/** True when the lifecycle holds an established stream. */ +export function isStreamOpen(state: StreamLifecycleState): boolean { + return state.phase === "open"; +} + +/** True when an open attempt is currently in flight. */ +export function isStreamConnecting(state: StreamLifecycleState): boolean { + return state.phase === "opening" || state.phase === "reconciling"; +} + +/** True when the lifecycle is paused awaiting external resumption. */ +export function isStreamPaused(state: StreamLifecycleState): boolean { + return state.phase === "waiting" || state.phase === "retrying"; +} + +// --------------------------------------------------------------------------- +// Domain events (pure reducer input — used by tests) +// --------------------------------------------------------------------------- + +export interface OpenRequest { + type: "OPEN_REQUEST"; + context: StreamContext; +} + +export interface OpenSuccess { + type: "OPEN_SUCCESS"; + epoch: number; +} + +export interface OpenFailure { + type: "OPEN_FAILURE"; + epoch: number; + message: string; +} + +export interface ReconcileRequest { + type: "RECONCILE_REQUEST"; +} + +export interface ReconcileSuccess { + type: "RECONCILE_SUCCESS"; +} + +export interface ReconcileFailure { + type: "RECONCILE_FAILURE"; + message: string; +} + +export interface CloseRequest { + type: "CLOSE_REQUEST"; + source: "unmount" | "deps_changed" | "lifecycle"; +} + +export type AppLifecycleSource = "visibility" | "app_state" | "reachability"; + +export interface AppLifecycleChange { + type: "APP_LIFECYCLE_CHANGE"; + source: AppLifecycleSource; + online: boolean; +} + +export type DomainEvent = + | OpenRequest + | OpenSuccess + | OpenFailure + | ReconcileRequest + | ReconcileSuccess + | ReconcileFailure + | CloseRequest + | AppLifecycleChange; + +// --------------------------------------------------------------------------- +// Guards +// --------------------------------------------------------------------------- + +function sameContext( + a: StreamContext | null, + b: StreamContext | null, +): boolean { + if (!a || !b) return false; + return ( + a.assistantId === b.assistantId && + a.conversationKey === b.conversationKey + ); +} + +// --------------------------------------------------------------------------- +// Actions +// --------------------------------------------------------------------------- + +export interface StreamLifecycleActions { + /** User/system requests opening the stream for a given conversation. */ + requestOpen: (context: StreamContext) => void; + /** Stream open fetch resolved successfully for the given epoch. */ + onOpenSuccess: (epoch: number) => void; + /** Stream open fetch (or active stream) failed for the given epoch. */ + onOpenFailure: (epoch: number, message: string) => void; + /** Trigger a reconcile that must complete before the next open. */ + requestReconcile: () => void; + /** Reconcile fetch completed successfully. */ + onReconcileSuccess: () => void; + /** Reconcile fetch failed. */ + onReconcileFailure: (message: string) => void; + /** User/system requests tearing the stream down. */ + requestClose: (source: CloseRequest["source"]) => void; + /** Visibility / app-state / reachability signal. Reducer dedups by phase. */ + onAppLifecycleChange: ( + source: AppLifecycleSource, + online: boolean, + ) => void; +} + +export type StreamLifecycleStore = StreamLifecycleState & StreamLifecycleActions; + +// --------------------------------------------------------------------------- +// Store +// --------------------------------------------------------------------------- + +const useStreamLifecycleStoreBase = create()((set) => ({ + ...INITIAL_STREAM_LIFECYCLE_STATE, + + // ----- Open flow ----- + + requestOpen: (context) => + set((s) => { + // Dedup: an OPEN_REQUEST for the same context while already + // opening or open is a no-op. Different context (conversation + // switch) bumps the epoch and restarts the open. + if ( + (s.phase === "opening" || s.phase === "open") && + sameContext(s.context, context) + ) { + return s; + } + return { + phase: "opening" as const, + epoch: s.epoch + 1, + context, + lastError: null, + }; + }), + + onOpenSuccess: (epoch) => + set((s) => { + // Stale callback from a prior open attempt — ignore. The current + // attempt's success will arrive separately. + if (epoch !== s.epoch) return s; + if (s.phase !== "opening") return s; + return { + phase: "open" as const, + consecutiveFailures: 0, + lastError: null, + }; + }), + + onOpenFailure: (epoch, message) => + set((s) => { + if (epoch !== s.epoch) return s; + // Failures can arrive while opening (fetch rejected) or while + // open (stream errored mid-flight). Both transition to retrying. + if (s.phase !== "opening" && s.phase !== "open") return s; + return { + phase: "retrying" as const, + consecutiveFailures: s.consecutiveFailures + 1, + lastError: message, + }; + }), + + // ----- Reconcile flow ----- + + requestReconcile: () => + set((s) => { + // Reconcile is a precursor to reopen. Valid from any phase that + // could hold pending local edits or expect to reopen soon. From + // `closed` it's a no-op (nothing to reconcile against and nothing + // pending to reopen). + if (s.phase === "closed" || s.phase === "reconciling") return s; + return { phase: "reconciling" as const }; + }), + + onReconcileSuccess: () => + set((s) => { + if (s.phase !== "reconciling") return s; + // Reconcile completed; reopen with a fresh epoch so callbacks + // from any stale in-flight stream are discarded. + return { + phase: "opening" as const, + epoch: s.epoch + 1, + lastError: null, + }; + }), + + onReconcileFailure: (message) => + set((s) => { + if (s.phase !== "reconciling") return s; + return { + phase: "retrying" as const, + consecutiveFailures: s.consecutiveFailures + 1, + lastError: message, + }; + }), + + // ----- Close ----- + + requestClose: (_source) => + set({ + phase: "closed", + context: null, + }), + + // ----- App lifecycle (visibility, app state, reachability) ----- + + onAppLifecycleChange: (_source, online) => + set((s) => { + if (!online) { + // Going offline / backgrounded: only `open` and `opening` + // have something to tear down. Already-paused phases ignore. + if (s.phase === "open") return { phase: "waiting" as const }; + if (s.phase === "opening") { + // Abort the in-flight open by bumping epoch so callbacks + // become stale, and park in `waiting` to resume later. + return { + phase: "waiting" as const, + epoch: s.epoch + 1, + }; + } + return s; + } + // Coming online / foregrounded: resume from paused phases by + // reconciling pending local edits before the next open. From + // `closed` we wait for an explicit `requestOpen` from the hook. + if (s.phase === "waiting" || s.phase === "retrying") { + return { phase: "reconciling" as const }; + } + return s; + }), +})); + +export const useStreamLifecycleStore = createSelectors( + useStreamLifecycleStoreBase, +); + +// --------------------------------------------------------------------------- +// Pure reducer (used by tests to verify state transitions in isolation) +// --------------------------------------------------------------------------- + +export function streamLifecycleReducer( + state: StreamLifecycleState, + event: DomainEvent, +): StreamLifecycleState { + switch (event.type) { + case "OPEN_REQUEST": + if ( + (state.phase === "opening" || state.phase === "open") && + sameContext(state.context, event.context) + ) { + return state; + } + return { + ...state, + phase: "opening", + epoch: state.epoch + 1, + context: event.context, + lastError: null, + }; + + case "OPEN_SUCCESS": + if (event.epoch !== state.epoch) return state; + if (state.phase !== "opening") return state; + return { + ...state, + phase: "open", + consecutiveFailures: 0, + lastError: null, + }; + + case "OPEN_FAILURE": + if (event.epoch !== state.epoch) return state; + if (state.phase !== "opening" && state.phase !== "open") return state; + return { + ...state, + phase: "retrying", + consecutiveFailures: state.consecutiveFailures + 1, + lastError: event.message, + }; + + case "RECONCILE_REQUEST": + if (state.phase === "closed" || state.phase === "reconciling") { + return state; + } + return { ...state, phase: "reconciling" }; + + case "RECONCILE_SUCCESS": + if (state.phase !== "reconciling") return state; + return { + ...state, + phase: "opening", + epoch: state.epoch + 1, + lastError: null, + }; + + case "RECONCILE_FAILURE": + if (state.phase !== "reconciling") return state; + return { + ...state, + phase: "retrying", + consecutiveFailures: state.consecutiveFailures + 1, + lastError: event.message, + }; + + case "CLOSE_REQUEST": + return { + ...state, + phase: "closed", + context: null, + }; + + case "APP_LIFECYCLE_CHANGE": + if (!event.online) { + if (state.phase === "open") { + return { ...state, phase: "waiting" }; + } + if (state.phase === "opening") { + return { ...state, phase: "waiting", epoch: state.epoch + 1 }; + } + return state; + } + if (state.phase === "waiting" || state.phase === "retrying") { + return { ...state, phase: "reconciling" }; + } + return state; + } +}