Add createAsyncGeneratorWithInitialValueAndSlotTracking#1536
Conversation
🦋 Changeset detectedLatest commit: 8856d01 The changes in this PR will be included in the next version bump. This PR includes changesets to release 46 packages
Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
BundleMonUnchanged files (142)
No change in files bundle size Final result: ✅ View report in BundleMon website ➡️ |
|
Documentation Preview: https://kit-docs-1mm159iu9-anza-tech.vercel.app |
trevor-cortex
left a comment
There was a problem hiding this comment.
Summary
Adds createAsyncGeneratorWithInitialValueAndSlotTracking, an async generator API that merges an initial RPC response with an ongoing subscription into a single stream, using slot numbers to silently drop out-of-order (stale) values. It's the async-iterator analogue of the existing createReactiveStoreWithInitialValueAndSlotTracking.
The implementation looks sound overall: single-threaded JS runtime + synchronous promise .then continuations mean the slot-comparison-and-update sequence in each source handler is race-free, which is the right intuition noted in the PR description. Test coverage is thorough — yield ordering, slot dropping from both directions, buffering, error propagation from each source, abort-signal plumbing, early abort, early break cleanup. Nice.
Things to watch out for
-
Behavioral divergence from the reactive store when the subscription ends before the RPC resolves. The subscription's
.thenhandler unconditionally setsdone = trueonce the iterator completes, which terminates the generator and discards any RPC response still in flight (the laterenqueuecall is gated byif (done || signal.aborted) return). The reactive store doesn't do this — the RPC can still populate the store after the subscription ends. This isn't necessarily wrong (and the JSDoc does say "completes when the subscription ends"), but it's an easy edge case to miss, and there's no test for "subscription completes while RPC is still pending". Worth adding one to pin the intended semantics. -
Queue drain on abort. When the abort signal fires,
onAbortsetsdone = truebut leaves any already-buffered items inqueue. If a consumer calls.next()after abort, they'll still receive buffered values beforedoneis observed. Theyields values received before abort, then completestest happens to exercise a similar path but collects the value before callingabort(), so the actual drain-after-abort behavior isn't asserted. Either is defensible, but please make the intent explicit with a test. -
Minor code smells (see inline comments): a redundant
pendingErrorcheck, awaitingReject = nullmissing from one branch (benign — resolve-after-settle is a no-op), an unusedreactivemock field, and an inconsistentas SolanaRpcResponse<TestValue>cast in the new test file vs the one being removed in the existing file in the same PR.
Notes for subsequent reviewers
- The slot-ordering/race reasoning hinges on the fact that
check lastUpdateSlot→update lastUpdateSlot→enqueueruns without any interveningawaitin each source handler. Worth double-checking that reasoning if anyone refactors this to introduce anawaitbetween those steps. - The subscription-completion semantics (point 1 above) is the main design question I'd want a second opinion on. Everything else is polish.
e87866b to
eb14b56
Compare
trevor-cortex
left a comment
There was a problem hiding this comment.
Re-review
The main design concern from the previous review — premature termination when the subscription ends before the RPC resolves — has been fixed cleanly with the rpcDone/subscriptionDone flags and the markSourcesDone() helper. The waitingReject = null inconsistency is also fixed. 👍
However, two tests will hang under the new semantics because they were updated to not resolve the RPC but still expect the generator to complete after complete(). Details inline. These should fail / time out in CI — worth running the suite locally to confirm.
Things to watch out for
- The two hanging tests (inline) are the main blocker.
- Please add a test for the specific fix: subscription completes before RPC resolves, then RPC resolves, and the yielded values include the RPC response. This directly exercises the behavior change and would have caught the hanging-test issue too.
- Minor: the
reactivefield on the mock is still unused, and theas SolanaRpcResponse<TestValue>cast in the test helper is still present even though the reactive store test removes its equivalent in this same PR.
For subsequent reviewers
- Focus on the termination logic in the two
.then(() => { ... markSourcesDone(); })branches and the interaction withhandleError/ abort. - Verify that the test suite actually passes — the two tests I flagged look like they hang rather than fail loudly.
| if (slot < lastUpdateSlot) continue; | ||
| lastUpdateSlot = slot; | ||
| enqueue({ context: { slot }, value: rpcSubscriptionValueMapper(value) }); | ||
| } |
There was a problem hiding this comment.
Nice fix on the termination semantics — this correctly waits for both sources now and matches the reactive store behavior. Consider adding a dedicated test that exercises the specific case: subscription completes (with or without yielding), then RPC resolves later, and the RPC value is still yielded. That directly covers the behavior change and makes the intent explicit.
lorisleiva
left a comment
There was a problem hiding this comment.
Jesus my brain hurts haha. Looks good though. That being said, I have more faith in Trevor than me on that one. 😅
| let resolve!: (response: SolanaRpcResponse<TestValue>) => void; | ||
| let reject!: (error: unknown) => void; | ||
| const promise = new Promise<SolanaRpcResponse<TestValue>>((res, rej) => { | ||
| resolve = res; | ||
| reject = rej; | ||
| }); |
There was a problem hiding this comment.
nit: you can use Promise.withResolvers here.
let { promise, resolve, reject } = Promise<SolanaRpcResponse<TestValue>>.withResolvers();There was a problem hiding this comment.
Nice I forgot about that, thanks! I had to add lib: "ES2024" to make this work, so just making sure that works in CI 🤞
|
🔎💬 Inkeep AI search and chat service is syncing content for source 'Solana Kit Docs' |
|
Because there has been no activity on this PR for 14 days since it was merged, it has been automatically locked. Please open a new issue if it requires a follow up. |
Summary of Changes
This PR adds
createAsyncGeneratorWithInitialValueAndSlotTracking. Same idea as the reactive store version, but presented as an async generator API.It allows you to take any RPC request and subscription with a
SolanaRpcResponseshape and that can map to the same value, and iterate over them while silently dropping any information from an older slot than has already been received.While the store only needs to provide the most recent data, this requires us to queue all responses received (from either source) in order, where the slot is not older than the previous last seen slot.
Note that avoiding race conditions is mostly implicit on the JS single threaded runtime, and the fact that Promise
thenblocks are synchronous.