diff --git a/.changeset/giant-falcons-invite.md b/.changeset/giant-falcons-invite.md new file mode 100644 index 000000000..c7cb927de --- /dev/null +++ b/.changeset/giant-falcons-invite.md @@ -0,0 +1,5 @@ +--- +'@solana/kit': minor +--- + +Add `createReactiveStoreWithInitialValueAndSlotTracking()`, a helper that combines an initial RPC fetch with an ongoing subscription into a single `ReactiveStore`. Uses slot-based comparison to ensure only the most recent value is kept, regardless of arrival order. Compatible with `useSyncExternalStore`, Svelte stores, and other reactive primitives. diff --git a/examples/react-app/src/functions/balance.ts b/examples/react-app/src/functions/balance.ts index bf62bc7a2..3a5334310 100644 --- a/examples/react-app/src/functions/balance.ts +++ b/examples/react-app/src/functions/balance.ts @@ -1,18 +1,21 @@ -import { AccountNotificationsApi, Address, GetBalanceApi, Lamports, Rpc, RpcSubscriptions } from '@solana/kit'; +import { + AccountNotificationsApi, + Address, + createReactiveStoreWithInitialValueAndSlotTracking, + GetBalanceApi, + Lamports, + Rpc, + RpcSubscriptions, +} from '@solana/kit'; import { SWRSubscription } from 'swr/subscription'; -const EXPLICIT_ABORT_TOKEN = Symbol(); - /** * This is an example of a strategy to fetch some account data and to keep it up to date over time. * It's implemented as an SWR subscription function (https://swr.vercel.app/docs/subscription) but * the approach is generalizable. * - * 1. Fetch the current account state and publish it to the consumer - * 2. Subscribe to account data notifications and publish them to the consumer - * - * At all points in time, check that the update you received -- no matter from where -- is from a - * higher slot (ie. is newer) than the last one you published to the consumer. + * It uses {@link createReactiveStoreWithInitialValueAndSlotTracking} to combine an initial RPC fetch with an + * ongoing subscription, using slot-based comparison to ensure only the latest value is published. */ export function balanceSubscribe( rpc: Rpc, @@ -21,53 +24,22 @@ export function balanceSubscribe( ) { const [{ address }, { next }] = subscriptionArgs; const abortController = new AbortController(); - // Keep track of the slot of the last-published update. - let lastUpdateSlot = -1n; - // Fetch the current balance of this account. - rpc.getBalance(address, { commitment: 'confirmed' }) - .send({ abortSignal: abortController.signal }) - .then(({ context: { slot }, value: lamports }) => { - if (slot < lastUpdateSlot) { - // The last-published update (ie. from the subscription) is newer than this one. - return; - } - lastUpdateSlot = slot; - next(null /* err */, lamports /* data */); - }) - .catch(e => { - if (e !== EXPLICIT_ABORT_TOKEN) { - next(e /* err */); - } - }); - // Subscribe for updates to that balance. - rpcSubscriptions - .accountNotifications(address) - .subscribe({ abortSignal: abortController.signal }) - .then(async accountInfoNotifications => { - try { - for await (const { - context: { slot }, - value: { lamports }, - } of accountInfoNotifications) { - if (slot < lastUpdateSlot) { - // The last-published update (ie. from the initial fetch) is newer than this - // one. - continue; - } - lastUpdateSlot = slot; - next(null /* err */, lamports /* data */); - } - } catch (e) { - next(e /* err */); - } - }) - .catch(e => { - if (e !== EXPLICIT_ABORT_TOKEN) { - next(e /* err */); - } - }); - // Return a cleanup callback that aborts the RPC call/subscription. + const store = createReactiveStoreWithInitialValueAndSlotTracking({ + abortSignal: abortController.signal, + rpcRequest: rpc.getBalance(address, { commitment: 'confirmed' }), + rpcSubscriptionRequest: rpcSubscriptions.accountNotifications(address), + rpcSubscriptionValueMapper: ({ lamports }) => lamports, + rpcValueMapper: lamports => lamports, + }); + store.subscribe(() => { + const error = store.getError(); + if (error) { + next(error as Error); + } else { + next(null, store.getState()); + } + }); return () => { - abortController.abort(EXPLICIT_ABORT_TOKEN); + abortController.abort(); }; } diff --git a/packages/kit/README.md b/packages/kit/README.md index 736d4ce18..a3d17ce50 100644 --- a/packages/kit/README.md +++ b/packages/kit/README.md @@ -38,6 +38,39 @@ await airdrop({ > [!NOTE] This only works on test clusters. +### `createReactiveStoreWithInitialValueAndSlotTracking(config)` + +Creates a `ReactiveStore` that combines an initial RPC fetch with an ongoing subscription to keep its state up to date. Uses slot-based comparison to ensure only the most recent value is kept, regardless of whether it came from the RPC response or a subscription notification. + +The returned store is compatible with React's `useSyncExternalStore`, Svelte stores, Solid's `from()`, and any other reactive primitive that expects a `{ subscribe, getState }` contract. + +```ts +import { + address, + createReactiveStoreWithInitialValueAndSlotTracking, + createSolanaRpc, + createSolanaRpcSubscriptions, +} from '@solana/kit'; + +const rpc = createSolanaRpc('http://127.0.0.1:8899'); +const rpcSubscriptions = createSolanaRpcSubscriptions('ws://127.0.0.1:8900'); +const myAddress = address('FnHyam9w4NZoWR6mKN1CuGBritdsEWZQa4Z4oawLZGxa'); + +const balanceStore = createReactiveStoreWithInitialValueAndSlotTracking({ + abortSignal: AbortSignal.timeout(60_000), + rpcRequest: rpc.getBalance(myAddress, { commitment: 'confirmed' }), + rpcValueMapper: lamports => lamports, + rpcSubscriptionRequest: rpcSubscriptions.accountNotifications(myAddress), + rpcSubscriptionValueMapper: ({ lamports }) => lamports, +}); + +const unsubscribe = balanceStore.subscribe(() => { + const error = balanceStore.getError(); + if (error) console.error('Error:', error); + else console.log('Balance:', balanceStore.getState()); +}); +``` + ### `decompileTransactionMessageFetchingLookupTables(compiledTransactionMessage, rpc, config)` Returns a `TransactionMessage` from a `CompiledTransactionMessage`. If any of the accounts in the compiled message require an address lookup table to find their address, this function will use the supplied RPC instance to fetch the contents of the address lookup table from the network. diff --git a/packages/kit/package.json b/packages/kit/package.json index 553772ff6..2c3ac0f29 100644 --- a/packages/kit/package.json +++ b/packages/kit/package.json @@ -119,6 +119,7 @@ "@solana/rpc-parsed-types": "workspace:*", "@solana/rpc-spec-types": "workspace:*", "@solana/rpc-subscriptions": "workspace:*", + "@solana/subscribable": "workspace:*", "@solana/rpc-types": "workspace:*", "@solana/signers": "workspace:*", "@solana/sysvars": "workspace:*", diff --git a/packages/kit/src/__tests__/create-reactive-store-with-initial-value-and-slot-tracking-test.ts b/packages/kit/src/__tests__/create-reactive-store-with-initial-value-and-slot-tracking-test.ts new file mode 100644 index 000000000..1e8ac9e49 --- /dev/null +++ b/packages/kit/src/__tests__/create-reactive-store-with-initial-value-and-slot-tracking-test.ts @@ -0,0 +1,539 @@ +import type { PendingRpcRequest } from '@solana/rpc'; +import type { PendingRpcSubscriptionsRequest } from '@solana/rpc-subscriptions'; +import type { SolanaRpcResponse } from '@solana/rpc-types'; + +import { createReactiveStoreWithInitialValueAndSlotTracking } from '../create-reactive-store-with-initial-value-and-slot-tracking'; + +/** Flush all pending microtasks by waiting for a macrotask boundary. */ +const flushMicrotasks = () => new Promise(resolve => setTimeout(resolve, 0)); + +type TestValue = { count: number }; + +function createMockRpcRequest(): { + mockRequest: PendingRpcRequest>; + reject(error: unknown): void; + resolve(response: SolanaRpcResponse): void; +} { + let resolve!: (response: SolanaRpcResponse) => void; + let reject!: (error: unknown) => void; + const promise = new Promise>((res, rej) => { + resolve = res; + reject = rej; + }); + return { + mockRequest: { send: jest.fn().mockReturnValue(promise) }, + reject, + resolve, + }; +} + +function createMockSubscriptionRequest(): { + complete(): void; + error(err: unknown): void; + mockRequest: PendingRpcSubscriptionsRequest>; + pushNotification(notification: SolanaRpcResponse): void; +} { + const notifications: SolanaRpcResponse[] = []; + let waitingResolve: ((value: IteratorResult>) => void) | null = null; + let waitingReject: ((reason: unknown) => void) | null = null; + let done = false; + let errorValue: unknown; + let hasError = false; + + const asyncIterable: AsyncIterable> = { + [Symbol.asyncIterator]() { + return { + next() { + if (notifications.length > 0) { + return Promise.resolve({ done: false, value: notifications.shift()! } as const); + } + if (done) { + return Promise.resolve({ done: true, value: undefined } as const); + } + if (hasError) { + return Promise.reject(errorValue as Error); + } + return new Promise>>((resolve, reject) => { + waitingResolve = resolve; + waitingReject = reject; + }); + }, + }; + }, + }; + + const pushNotification = (notification: SolanaRpcResponse) => { + if (waitingResolve) { + const resolve = waitingResolve; + waitingResolve = null; + resolve({ done: false, value: notification }); + } else { + notifications.push(notification); + } + }; + + const error = (err: unknown) => { + hasError = true; + errorValue = err; + if (waitingReject) { + const reject = waitingReject; + waitingResolve = null; + waitingReject = null; + reject(err); + } + }; + + const complete = () => { + done = true; + if (waitingResolve) { + const resolve = waitingResolve; + waitingResolve = null; + resolve({ done: true, value: undefined }); + } + }; + + return { + complete, + error, + mockRequest: { + reactive: jest.fn().mockRejectedValue(new Error('not implemented')), + subscribe: jest.fn().mockResolvedValue(asyncIterable), + }, + pushNotification, + }; +} + +function rpcResponse(slot: number, value: TestValue): SolanaRpcResponse { + return { context: { slot: BigInt(slot) }, value } as SolanaRpcResponse; +} + +describe('createReactiveStoreWithInitialValueAndSlotTracking', () => { + let abortController: AbortController; + + beforeEach(() => { + abortController = new AbortController(); + }); + + afterEach(() => { + abortController.abort(); + }); + + describe('getState()', () => { + it('returns `undefined` before any data arrives', () => { + const { mockRequest: rpcRequest } = createMockRpcRequest(); + const { mockRequest: rpcSubscriptionRequest } = createMockSubscriptionRequest(); + const store = createReactiveStoreWithInitialValueAndSlotTracking({ + abortSignal: abortController.signal, + rpcRequest, + rpcSubscriptionRequest, + rpcSubscriptionValueMapper: v => v.count, + rpcValueMapper: v => v.count, + }); + expect(store.getState()).toBeUndefined(); + }); + it('updates with the RPC response value', async () => { + expect.assertions(1); + const { mockRequest: rpcRequest, resolve } = createMockRpcRequest(); + const { mockRequest: rpcSubscriptionRequest } = createMockSubscriptionRequest(); + const store = createReactiveStoreWithInitialValueAndSlotTracking({ + abortSignal: abortController.signal, + rpcRequest, + rpcSubscriptionRequest, + rpcSubscriptionValueMapper: v => v.count, + rpcValueMapper: v => v.count, + }); + resolve(rpcResponse(100, { count: 42 })); + await flushMicrotasks(); + expect(store.getState()).toBe(42); + }); + it('updates with a subscription notification value', async () => { + expect.assertions(1); + const { mockRequest: rpcRequest } = createMockRpcRequest(); + const { mockRequest: rpcSubscriptionRequest, pushNotification } = createMockSubscriptionRequest(); + const store = createReactiveStoreWithInitialValueAndSlotTracking({ + abortSignal: abortController.signal, + rpcRequest, + rpcSubscriptionRequest, + rpcSubscriptionValueMapper: v => v.count, + rpcValueMapper: v => v.count, + }); + await flushMicrotasks(); + pushNotification(rpcResponse(100, { count: 99 })); + await flushMicrotasks(); + expect(store.getState()).toBe(99); + }); + it('ignores the RPC response when a newer subscription notification has already arrived', async () => { + expect.assertions(1); + const { mockRequest: rpcRequest, resolve } = createMockRpcRequest(); + const { mockRequest: rpcSubscriptionRequest, pushNotification } = createMockSubscriptionRequest(); + const store = createReactiveStoreWithInitialValueAndSlotTracking({ + abortSignal: abortController.signal, + rpcRequest, + rpcSubscriptionRequest, + rpcSubscriptionValueMapper: v => v.count, + rpcValueMapper: v => v.count, + }); + await flushMicrotasks(); + pushNotification(rpcResponse(200, { count: 99 })); + await flushMicrotasks(); + // RPC response arrives later at an older slot + resolve(rpcResponse(100, { count: 42 })); + await flushMicrotasks(); + expect(store.getState()).toBe(99); + }); + it('ignores a subscription notification when the RPC response was at a newer slot', async () => { + expect.assertions(1); + const { mockRequest: rpcRequest, resolve } = createMockRpcRequest(); + const { mockRequest: rpcSubscriptionRequest, pushNotification } = createMockSubscriptionRequest(); + const store = createReactiveStoreWithInitialValueAndSlotTracking({ + abortSignal: abortController.signal, + rpcRequest, + rpcSubscriptionRequest, + rpcSubscriptionValueMapper: v => v.count, + rpcValueMapper: v => v.count, + }); + resolve(rpcResponse(200, { count: 42 })); + await flushMicrotasks(); + pushNotification(rpcResponse(100, { count: 99 })); + await flushMicrotasks(); + expect(store.getState()).toBe(42); + }); + it('preserves the last known value after an error', async () => { + expect.assertions(1); + const { mockRequest: rpcRequest, resolve } = createMockRpcRequest(); + const { mockRequest: rpcSubscriptionRequest, error } = createMockSubscriptionRequest(); + const store = createReactiveStoreWithInitialValueAndSlotTracking({ + abortSignal: abortController.signal, + rpcRequest, + rpcSubscriptionRequest, + rpcSubscriptionValueMapper: v => v.count, + rpcValueMapper: v => v.count, + }); + resolve(rpcResponse(100, { count: 42 })); + await flushMicrotasks(); + error(new Error('subscription failed')); + await flushMicrotasks(); + expect(store.getState()).toBe(42); + }); + }); + + describe('getError()', () => { + it('returns `undefined` before any error', () => { + const { mockRequest: rpcRequest } = createMockRpcRequest(); + const { mockRequest: rpcSubscriptionRequest } = createMockSubscriptionRequest(); + const store = createReactiveStoreWithInitialValueAndSlotTracking({ + abortSignal: abortController.signal, + rpcRequest, + rpcSubscriptionRequest, + rpcSubscriptionValueMapper: v => v.count, + rpcValueMapper: v => v.count, + }); + expect(store.getError()).toBeUndefined(); + }); + it('captures an error from the RPC request', async () => { + expect.assertions(1); + const { mockRequest: rpcRequest, reject } = createMockRpcRequest(); + const { mockRequest: rpcSubscriptionRequest } = createMockSubscriptionRequest(); + const store = createReactiveStoreWithInitialValueAndSlotTracking({ + abortSignal: abortController.signal, + rpcRequest, + rpcSubscriptionRequest, + rpcSubscriptionValueMapper: v => v.count, + rpcValueMapper: v => v.count, + }); + const error = new Error('rpc failed'); + reject(error); + await flushMicrotasks(); + expect(store.getError()).toBe(error); + }); + it('captures an error from the subscription', async () => { + expect.assertions(1); + const { mockRequest: rpcRequest } = createMockRpcRequest(); + const { mockRequest: rpcSubscriptionRequest, error } = createMockSubscriptionRequest(); + const store = createReactiveStoreWithInitialValueAndSlotTracking({ + abortSignal: abortController.signal, + rpcRequest, + rpcSubscriptionRequest, + rpcSubscriptionValueMapper: v => v.count, + rpcValueMapper: v => v.count, + }); + await flushMicrotasks(); + const subscriptionError = new Error('subscription failed'); + error(subscriptionError); + await flushMicrotasks(); + expect(store.getError()).toBe(subscriptionError); + }); + it('only captures the first error when RPC fails then subscription fails', async () => { + expect.assertions(1); + const { mockRequest: rpcRequest, reject: rejectRpc } = createMockRpcRequest(); + const { mockRequest: rpcSubscriptionRequest, error: errorSubscription } = createMockSubscriptionRequest(); + const store = createReactiveStoreWithInitialValueAndSlotTracking({ + abortSignal: abortController.signal, + rpcRequest, + rpcSubscriptionRequest, + rpcSubscriptionValueMapper: v => v.count, + rpcValueMapper: v => v.count, + }); + await flushMicrotasks(); + rejectRpc(new Error('rpc error')); + await flushMicrotasks(); + errorSubscription(new Error('subscription error')); + await flushMicrotasks(); + expect(store.getError()).toEqual(new Error('rpc error')); + }); + it('only captures the first error when subscription fails then RPC fails', async () => { + expect.assertions(1); + const { mockRequest: rpcRequest, reject: rejectRpc } = createMockRpcRequest(); + const { mockRequest: rpcSubscriptionRequest, error: errorSubscription } = createMockSubscriptionRequest(); + const store = createReactiveStoreWithInitialValueAndSlotTracking({ + abortSignal: abortController.signal, + rpcRequest, + rpcSubscriptionRequest, + rpcSubscriptionValueMapper: v => v.count, + rpcValueMapper: v => v.count, + }); + await flushMicrotasks(); + errorSubscription(new Error('subscription error')); + await flushMicrotasks(); + rejectRpc(new Error('rpc error')); + await flushMicrotasks(); + expect(store.getError()).toEqual(new Error('subscription error')); + }); + }); + + describe('subscribe()', () => { + it('calls the subscriber when the RPC response arrives', async () => { + expect.assertions(1); + const { mockRequest: rpcRequest, resolve } = createMockRpcRequest(); + const { mockRequest: rpcSubscriptionRequest } = createMockSubscriptionRequest(); + const store = createReactiveStoreWithInitialValueAndSlotTracking({ + abortSignal: abortController.signal, + rpcRequest, + rpcSubscriptionRequest, + rpcSubscriptionValueMapper: v => v.count, + rpcValueMapper: v => v.count, + }); + const subscriber = jest.fn(); + store.subscribe(subscriber); + resolve(rpcResponse(100, { count: 42 })); + await flushMicrotasks(); + expect(subscriber).toHaveBeenCalledTimes(1); + }); + it('calls the subscriber when a subscription notification arrives', async () => { + expect.assertions(1); + const { mockRequest: rpcRequest } = createMockRpcRequest(); + const { mockRequest: rpcSubscriptionRequest, pushNotification } = createMockSubscriptionRequest(); + const store = createReactiveStoreWithInitialValueAndSlotTracking({ + abortSignal: abortController.signal, + rpcRequest, + rpcSubscriptionRequest, + rpcSubscriptionValueMapper: v => v.count, + rpcValueMapper: v => v.count, + }); + const subscriber = jest.fn(); + store.subscribe(subscriber); + await flushMicrotasks(); + pushNotification(rpcResponse(100, { count: 99 })); + await flushMicrotasks(); + expect(subscriber).toHaveBeenCalledTimes(1); + }); + it('does not call the subscriber when an out-of-order notification is skipped', async () => { + expect.assertions(1); + const { mockRequest: rpcRequest, resolve } = createMockRpcRequest(); + const { mockRequest: rpcSubscriptionRequest, pushNotification } = createMockSubscriptionRequest(); + const store = createReactiveStoreWithInitialValueAndSlotTracking({ + abortSignal: abortController.signal, + rpcRequest, + rpcSubscriptionRequest, + rpcSubscriptionValueMapper: v => v.count, + rpcValueMapper: v => v.count, + }); + const subscriber = jest.fn(); + store.subscribe(subscriber); + resolve(rpcResponse(200, { count: 42 })); + await flushMicrotasks(); + subscriber.mockClear(); + await flushMicrotasks(); + // This notification is at an older slot and should be skipped + pushNotification(rpcResponse(100, { count: 99 })); + await flushMicrotasks(); + expect(subscriber).not.toHaveBeenCalled(); + }); + it('calls the subscriber when an error occurs', async () => { + expect.assertions(1); + const { mockRequest: rpcRequest, reject } = createMockRpcRequest(); + const { mockRequest: rpcSubscriptionRequest } = createMockSubscriptionRequest(); + const store = createReactiveStoreWithInitialValueAndSlotTracking({ + abortSignal: abortController.signal, + rpcRequest, + rpcSubscriptionRequest, + rpcSubscriptionValueMapper: v => v.count, + rpcValueMapper: v => v.count, + }); + const subscriber = jest.fn(); + store.subscribe(subscriber); + reject(new Error('fail')); + await flushMicrotasks(); + expect(subscriber).toHaveBeenCalledTimes(1); + }); + it('calls the subscriber when a subscription error occurs', async () => { + expect.assertions(1); + const { mockRequest: rpcRequest } = createMockRpcRequest(); + const { mockRequest: rpcSubscriptionRequest, error } = createMockSubscriptionRequest(); + const store = createReactiveStoreWithInitialValueAndSlotTracking({ + abortSignal: abortController.signal, + rpcRequest, + rpcSubscriptionRequest, + rpcSubscriptionValueMapper: v => v.count, + rpcValueMapper: v => v.count, + }); + const subscriber = jest.fn(); + store.subscribe(subscriber); + await flushMicrotasks(); + error(new Error('fail')); + await flushMicrotasks(); + expect(subscriber).toHaveBeenCalledTimes(1); + }); + it('stops calling the subscriber after unsubscribe', async () => { + expect.assertions(1); + const { mockRequest: rpcRequest, resolve } = createMockRpcRequest(); + const { mockRequest: rpcSubscriptionRequest } = createMockSubscriptionRequest(); + const store = createReactiveStoreWithInitialValueAndSlotTracking({ + abortSignal: abortController.signal, + rpcRequest, + rpcSubscriptionRequest, + rpcSubscriptionValueMapper: v => v.count, + rpcValueMapper: v => v.count, + }); + const subscriber = jest.fn(); + const unsubscribe = store.subscribe(subscriber); + unsubscribe(); + resolve(rpcResponse(100, { count: 42 })); + await flushMicrotasks(); + expect(subscriber).not.toHaveBeenCalled(); + }); + it('the unsubscribe function is idempotent', () => { + const { mockRequest: rpcRequest } = createMockRpcRequest(); + const { mockRequest: rpcSubscriptionRequest } = createMockSubscriptionRequest(); + const store = createReactiveStoreWithInitialValueAndSlotTracking({ + abortSignal: abortController.signal, + rpcRequest, + rpcSubscriptionRequest, + rpcSubscriptionValueMapper: v => v.count, + rpcValueMapper: v => v.count, + }); + const unsubscribe = store.subscribe(jest.fn()); + expect(() => { + unsubscribe(); + unsubscribe(); + }).not.toThrow(); + }); + }); + + describe('abort signal', () => { + it('aborts the signal passed to the RPC request when the caller aborts', () => { + const { mockRequest: rpcRequest } = createMockRpcRequest(); + const { mockRequest: rpcSubscriptionRequest } = createMockSubscriptionRequest(); + createReactiveStoreWithInitialValueAndSlotTracking({ + abortSignal: abortController.signal, + rpcRequest, + rpcSubscriptionRequest, + rpcSubscriptionValueMapper: v => v.count, + rpcValueMapper: v => v.count, + }); + const rpcSignal = (rpcRequest.send as jest.Mock).mock.calls[0][0].abortSignal; + expect(rpcSignal.aborted).toBe(false); + abortController.abort('test reason'); + expect(rpcSignal.aborted).toBe(true); + expect(rpcSignal.reason).toBe('test reason'); + }); + it('aborts the signal passed to the subscription request when the caller aborts', () => { + const { mockRequest: rpcRequest } = createMockRpcRequest(); + const { mockRequest: rpcSubscriptionRequest } = createMockSubscriptionRequest(); + createReactiveStoreWithInitialValueAndSlotTracking({ + abortSignal: abortController.signal, + rpcRequest, + rpcSubscriptionRequest, + rpcSubscriptionValueMapper: v => v.count, + rpcValueMapper: v => v.count, + }); + const subscriptionSignal = (rpcSubscriptionRequest.subscribe as jest.Mock).mock.calls[0][0].abortSignal; + expect(subscriptionSignal.aborted).toBe(false); + abortController.abort('test reason'); + expect(subscriptionSignal.aborted).toBe(true); + expect(subscriptionSignal.reason).toBe('test reason'); + }); + it('swallows errors from the RPC request when the caller aborts', async () => { + expect.assertions(1); + const { mockRequest: rpcRequest, reject } = createMockRpcRequest(); + const { mockRequest: rpcSubscriptionRequest } = createMockSubscriptionRequest(); + const store = createReactiveStoreWithInitialValueAndSlotTracking({ + abortSignal: abortController.signal, + rpcRequest, + rpcSubscriptionRequest, + rpcSubscriptionValueMapper: v => v.count, + rpcValueMapper: v => v.count, + }); + abortController.abort(); + reject(new Error('aborted')); + await flushMicrotasks(); + expect(store.getError()).toBeUndefined(); + }); + it('swallows errors from the subscription when the caller aborts', async () => { + expect.assertions(1); + const { mockRequest: rpcRequest } = createMockRpcRequest(); + const { mockRequest: rpcSubscriptionRequest, error } = createMockSubscriptionRequest(); + const store = createReactiveStoreWithInitialValueAndSlotTracking({ + abortSignal: abortController.signal, + rpcRequest, + rpcSubscriptionRequest, + rpcSubscriptionValueMapper: v => v.count, + rpcValueMapper: v => v.count, + }); + await flushMicrotasks(); + abortController.abort(); + error(new Error('aborted')); + await flushMicrotasks(); + expect(store.getError()).toBeUndefined(); + }); + it('does not update state when the RPC response arrives after abort', async () => { + expect.assertions(2); + const { mockRequest: rpcRequest, resolve } = createMockRpcRequest(); + const { mockRequest: rpcSubscriptionRequest } = createMockSubscriptionRequest(); + const store = createReactiveStoreWithInitialValueAndSlotTracking({ + abortSignal: abortController.signal, + rpcRequest, + rpcSubscriptionRequest, + rpcSubscriptionValueMapper: v => v.count, + rpcValueMapper: v => v.count, + }); + const subscriber = jest.fn(); + store.subscribe(subscriber); + abortController.abort(); + resolve(rpcResponse(100, { count: 42 })); + await flushMicrotasks(); + expect(store.getState()).toBeUndefined(); + expect(subscriber).not.toHaveBeenCalled(); + }); + it('does not update state when a subscription notification arrives after abort', async () => { + expect.assertions(2); + const { mockRequest: rpcRequest } = createMockRpcRequest(); + const { mockRequest: rpcSubscriptionRequest, pushNotification } = createMockSubscriptionRequest(); + const store = createReactiveStoreWithInitialValueAndSlotTracking({ + abortSignal: abortController.signal, + rpcRequest, + rpcSubscriptionRequest, + rpcSubscriptionValueMapper: v => v.count, + rpcValueMapper: v => v.count, + }); + const subscriber = jest.fn(); + store.subscribe(subscriber); + await flushMicrotasks(); + abortController.abort(); + pushNotification(rpcResponse(100, { count: 99 })); + await flushMicrotasks(); + expect(store.getState()).toBeUndefined(); + expect(subscriber).not.toHaveBeenCalled(); + }); + }); +}); diff --git a/packages/kit/src/create-reactive-store-with-initial-value-and-slot-tracking.ts b/packages/kit/src/create-reactive-store-with-initial-value-and-slot-tracking.ts new file mode 100644 index 000000000..141e0e631 --- /dev/null +++ b/packages/kit/src/create-reactive-store-with-initial-value-and-slot-tracking.ts @@ -0,0 +1,159 @@ +import type { PendingRpcRequest } from '@solana/rpc'; +import type { PendingRpcSubscriptionsRequest } from '@solana/rpc-subscriptions'; +import type { SolanaRpcResponse } from '@solana/rpc-types'; +import type { ReactiveStore } from '@solana/subscribable'; + +type CreateReactiveStoreWithInitialValueAndSlotTrackingConfig = Readonly<{ + /** + * Triggering this abort signal will cancel the pending RPC request and subscription, and + * disconnect the store from further updates. + */ + abortSignal: AbortSignal; + /** + * A pending RPC request whose response will be used to set the store's initial state. + * The response must be a {@link SolanaRpcResponse} so that its slot can be compared with + * subscription notifications. + */ + rpcRequest: PendingRpcRequest>; + /** + * A pending RPC subscription request whose notifications will be used to keep the store + * up to date. Each notification must be a {@link SolanaRpcResponse} so that its slot can be + * compared with the initial RPC response and other notifications. + */ + rpcSubscriptionRequest: PendingRpcSubscriptionsRequest>; + /** + * Maps the value from a subscription notification to the item type stored in the reactive store. + */ + rpcSubscriptionValueMapper: (value: TSubscriptionValue) => TItem; + /** + * Maps the value from the RPC response to the item type stored in the reactive store. + */ + rpcValueMapper: (value: TRpcValue) => TItem; +}>; + +/** + * Creates a {@link ReactiveStore} that combines an initial RPC fetch with an ongoing subscription + * to keep its state up to date. + * + * The store uses slot-based comparison to ensure that only the most recent value is kept, + * regardless of whether it came from the initial RPC response or a subscription notification. + * This prevents stale data from overwriting newer data when the RPC response and subscription + * notifications arrive out of order. + * + * Things to note: + * + * - `getState()` returns `undefined` until the first response or notification arrives. + * - On error from either source, `getState()` continues to return the last known value and + * `getError()` returns the error. Only the first error is captured. + * - When an error occurs, the abort signal is triggered, cancelling both the RPC request and + * the subscription. + * - Triggering the caller's abort signal disconnects the store from both sources. + * + * @param config + * + * @example + * ```ts + * import { + * address, + * createReactiveStoreWithInitialValueAndSlotTracking, + * createSolanaRpc, + * createSolanaRpcSubscriptions, + * } from '@solana/kit'; + * + * const rpc = createSolanaRpc('http://127.0.0.1:8899'); + * const rpcSubscriptions = createSolanaRpcSubscriptions('ws://127.0.0.1:8900'); + * const myAddress = address('FnHyam9w4NZoWR6mKN1CuGBritdsEWZQa4Z4oawLZGxa'); + * + * const balanceStore = createReactiveStoreWithInitialValueAndSlotTracking({ + * abortSignal: AbortSignal.timeout(60_000), + * rpcRequest: rpc.getBalance(myAddress, { commitment: 'confirmed' }), + * rpcValueMapper: lamports => lamports, + * rpcSubscriptionRequest: rpcSubscriptions.accountNotifications(myAddress), + * rpcSubscriptionValueMapper: ({ lamports }) => lamports, + * }); + * + * const unsubscribe = balanceStore.subscribe(() => { + * const error = balanceStore.getError(); + * if (error) console.error('Error:', error); + * else console.log('Balance:', balanceStore.getState()); + * }); + * ``` + * + * @see {@link ReactiveStore} + */ +export function createReactiveStoreWithInitialValueAndSlotTracking({ + abortSignal, + rpcRequest, + rpcValueMapper, + rpcSubscriptionRequest, + rpcSubscriptionValueMapper, +}: CreateReactiveStoreWithInitialValueAndSlotTrackingConfig< + TRpcValue, + TSubscriptionValue, + TItem +>): ReactiveStore { + let currentState: TItem | undefined; + let currentError: unknown; + let lastUpdateSlot = -1n; + const subscribers = new Set<() => void>(); + + const abortController = new AbortController(); + abortSignal.addEventListener('abort', () => abortController.abort(abortSignal.reason)); + const signal = abortController.signal; + + function notifySubscribers() { + subscribers.forEach(cb => cb()); + } + + function handleError(err: unknown) { + // Ignore if the signal has already been aborted + if (signal.aborted) return; + // Only capture the first error + if (currentError !== undefined) return; + currentError = err; + abortController.abort(err); + notifySubscribers(); + } + + rpcRequest + .send({ abortSignal: signal }) + .then(({ context: { slot }, value }) => { + if (signal.aborted) return; + if (slot < lastUpdateSlot) return; + lastUpdateSlot = slot; + currentState = rpcValueMapper(value); + notifySubscribers(); + }) + .catch(handleError); + + rpcSubscriptionRequest + .subscribe({ abortSignal: signal }) + .then(async notifications => { + for await (const { + context: { slot }, + value, + } of notifications) { + if (signal.aborted) return; + if (slot < lastUpdateSlot) continue; + lastUpdateSlot = slot; + currentState = rpcSubscriptionValueMapper(value); + notifySubscribers(); + } + }) + .catch(handleError); + + return { + getError(): unknown { + return currentError; + }, + getState(): TItem | undefined { + return currentState; + }, + subscribe(callback: () => void): () => void { + subscribers.add(callback); + return () => { + subscribers.delete(callback); + }; + }, + }; +} diff --git a/packages/kit/src/index.ts b/packages/kit/src/index.ts index 2216f0fcd..a17c5634a 100644 --- a/packages/kit/src/index.ts +++ b/packages/kit/src/index.ts @@ -25,6 +25,7 @@ export * from '@solana/rpc-types'; export * from '@solana/signers'; export * from '@solana/transaction-messages'; export * from '@solana/transactions'; +export * from './create-reactive-store-with-initial-value-and-slot-tracking'; export * from './airdrop'; export * from './compute-unit-limit-estimation'; export * from './decompile-transaction-message-fetching-lookup-tables'; diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 5a327dd40..3f2f76ed9 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -741,6 +741,9 @@ importers: '@solana/signers': specifier: workspace:* version: link:../signers + '@solana/subscribable': + specifier: workspace:* + version: link:../subscribable '@solana/sysvars': specifier: workspace:* version: link:../sysvars