Skip to content

Commit

Permalink
Update RPC transports to use new types (#3148)
Browse files Browse the repository at this point in the history
This PR updates the RPC Transport layer by making sure they return the new `RpcResponse` interface instead of returning the parsed response data directly.
  • Loading branch information
lorisleiva authored Sep 1, 2024
1 parent 4f87d12 commit e1cb697
Show file tree
Hide file tree
Showing 10 changed files with 87 additions and 48 deletions.
8 changes: 8 additions & 0 deletions .changeset/stupid-tomatoes-reflect.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
'@solana/rpc-transport-http': patch
'@solana/rpc-spec': patch
'@solana/rpc-api': patch
'@solana/rpc': patch
---

Make `RpcTransport` return new `RpcReponse` type instead of parsed JSON data
11 changes: 9 additions & 2 deletions packages/rpc-api/src/__tests__/get-health-test.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
import { SOLANA_ERROR__JSON_RPC__SERVER_ERROR_NODE_UNHEALTHY, SolanaError } from '@solana/errors';
import { createRpc, type Rpc } from '@solana/rpc-spec';
import { createRpc, type Rpc, type RpcResponse } from '@solana/rpc-spec';

import { createSolanaRpcApi, GetHealthApi } from '../index';
import { createLocalhostSolanaRpc } from './__setup__';

function createMockResponse<T>(jsonResponse: T): RpcResponse<T> {
return {
json: () => Promise.resolve(jsonResponse),
text: () => Promise.resolve(JSON.stringify(jsonResponse)),
};
}

describe('getHealth', () => {
describe('when the node is healthy', () => {
let rpc: Rpc<GetHealthApi>;
Expand All @@ -29,7 +36,7 @@ describe('getHealth', () => {
beforeEach(() => {
rpc = createRpc({
api: createSolanaRpcApi(),
transport: jest.fn().mockResolvedValue({ error: errorObject }),
transport: jest.fn().mockResolvedValue(createMockResponse({ error: errorObject })),
});
});
it('returns an error message', async () => {
Expand Down
14 changes: 11 additions & 3 deletions packages/rpc-spec/src/__tests__/rpc-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,20 @@ import { createRpcMessage } from '@solana/rpc-spec-types';
import { createRpc, Rpc } from '../rpc';
import { RpcApi } from '../rpc-api';
import { RpcApiRequestPlan } from '../rpc-request';
import { RpcResponse } from '../rpc-shared';
import { RpcTransport } from '../rpc-transport';

interface TestRpcMethods {
someMethod(...args: unknown[]): unknown;
}

function createMockResponse<T>(jsonResponse: T): RpcResponse<T> {
return {
json: () => Promise.resolve(jsonResponse),
text: () => Promise.resolve(JSON.stringify(jsonResponse)),
};
}

describe('JSON-RPC 2.0', () => {
let makeHttpRequest: RpcTransport;
let rpc: Rpc<TestRpcMethods>;
Expand All @@ -34,7 +42,7 @@ describe('JSON-RPC 2.0', () => {
});
it('returns results from the transport', async () => {
expect.assertions(1);
(makeHttpRequest as jest.Mock).mockResolvedValueOnce(123);
(makeHttpRequest as jest.Mock).mockResolvedValueOnce(createMockResponse(123));
const result = await rpc.someMethod().send();
expect(result).toBe(123);
});
Expand Down Expand Up @@ -90,13 +98,13 @@ describe('JSON-RPC 2.0', () => {
});
it('calls the response transformer with the response from the JSON-RPC 2.0 endpoint', async () => {
expect.assertions(1);
(makeHttpRequest as jest.Mock).mockResolvedValueOnce(123);
(makeHttpRequest as jest.Mock).mockResolvedValueOnce(createMockResponse(123));
await rpc.someMethod().send();
expect(responseTransformer).toHaveBeenCalledWith(123, 'someMethod');
});
it('returns the processed response', async () => {
expect.assertions(1);
(makeHttpRequest as jest.Mock).mockResolvedValueOnce(123);
(makeHttpRequest as jest.Mock).mockResolvedValueOnce(createMockResponse(123));
const result = await rpc.someMethod().send();
expect(result).toBe('123 processed response');
});
Expand Down
10 changes: 6 additions & 4 deletions packages/rpc-spec/src/rpc-transport.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
type RpcTransportConfig = Readonly<{
import { RpcResponse } from './rpc-shared';

type RpcTransportRequest = Readonly<{
payload: unknown;
signal?: AbortSignal;
}>;

export interface RpcTransport {
<TResponse>(config: RpcTransportConfig): Promise<TResponse>;
}
export type RpcTransport = {
<TResponse>(request: RpcTransportRequest): Promise<RpcResponse<TResponse>>;
};
9 changes: 4 additions & 5 deletions packages/rpc-spec/src/rpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import {
createRpcMessage,
Flatten,
OverloadImplementations,
RpcResponseData,
UnionToIntersection,
} from '@solana/rpc-spec-types';

Expand Down Expand Up @@ -68,12 +67,12 @@ function createPendingRpcRequest<TRpcMethods, TRpcTransport extends RpcTransport
return {
async send(options?: RpcSendOptions): Promise<TResponse> {
const { methodName, params, responseTransformer } = pendingRequest;
const payload = createRpcMessage(methodName, params);
const response = await rpcConfig.transport<RpcResponseData<unknown>>({
payload,
const response = await rpcConfig.transport<TResponse>({
payload: createRpcMessage(methodName, params),
signal: options?.abortSignal,
});
return (responseTransformer ? responseTransformer(response, methodName) : response) as TResponse;
const responseData = await response.json();
return responseTransformer ? responseTransformer(responseData, methodName) : responseData;
},
};
}
26 changes: 14 additions & 12 deletions packages/rpc-transport-http/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const transport = createHttpTransport({ url: 'https://api.mainnet-beta.solana.co
const response = await transport({
payload: { id: 1, jsonrpc: '2.0', method: 'getSlot' },
});
const data = await response.json();
```

#### Config
Expand Down Expand Up @@ -67,16 +68,17 @@ const transport = createHttpTransport({
});
let id = 0;
const balances = await Promise.allSettled(
accounts.map(account =>
transport({
accounts.map(async account => {
const response = await transport({
payload: {
id: ++id,
jsonrpc: '2.0',
method: 'getBalance',
params: [account],
},
}),
),
});
return await response.json();
}),
);
```

Expand Down Expand Up @@ -109,7 +111,7 @@ Using this core transport, you can implement specialized functionality for lever
Here’s an example of how someone might implement a “round robin” approach to distribute requests to multiple transports:

```ts
import { RpcTransport } from '@solana/rpc-spec';
import { RpcResponse, RpcTransport } from '@solana/rpc-spec';
import { createHttpTransport } from '@solana/rpc-transport-http';

// Create a transport for each RPC server
Expand All @@ -121,7 +123,7 @@ const transports = [

// Create a wrapper transport that distributes requests to them
let nextTransport = 0;
async function roundRobinTransport<TResponse>(...args: Parameters<RpcTransport>): Promise<TResponse> {
async function roundRobinTransport<TResponse>(...args: Parameters<RpcTransport>): Promise<RpcResponse<TResponse>> {
const transport = transports[nextTransport];
nextTransport = (nextTransport + 1) % transports.length;
return await transport(...args);
Expand All @@ -135,7 +137,7 @@ Another example of a possible customization for a transport is to shard requests
Perhaps your application needs to make a large number of requests, or needs to fan request for different methods out to different servers. Here’s an example of an implementation that does the latter:

```ts
import { RpcTransport } from '@solana/rpc-spec';
import { RpcResponse, RpcTransport } from '@solana/rpc-spec';
import { createHttpTransport } from '@solana/rpc-transport-http';

// Create multiple transports
Expand All @@ -160,7 +162,7 @@ function selectShard(method: string): RpcTransport {
}
}

async function shardingTransport<TResponse>(...args: Parameters<RpcTransport>): Promise<TResponse> {
async function shardingTransport<TResponse>(...args: Parameters<RpcTransport>): Promise<RpcResponse<TResponse>> {
const payload = args[0].payload as { method: string };
const selectedTransport = selectShard(payload.method);
return await selectedTransport(...args);
Expand All @@ -172,7 +174,7 @@ async function shardingTransport<TResponse>(...args: Parameters<RpcTransport>):
The transport library can also be used to implement custom retry logic on any request:

```ts
import { RpcTransport } from '@solana/rpc-spec';
import { RpcResponse, RpcTransport } from '@solana/rpc-spec';
import { createHttpTransport } from '@solana/rpc-transport-http';

// Set the maximum number of attempts to retry a request
Expand All @@ -193,7 +195,7 @@ function calculateRetryDelay(attempt: number): number {
}

// A retrying transport that will retry up to `MAX_ATTEMPTS` times before failing
async function retryingTransport<TResponse>(...args: Parameters<RpcTransport>): Promise<TResponse> {
async function retryingTransport<TResponse>(...args: Parameters<RpcTransport>): Promise<RpcResponse<TResponse>> {
let requestError;
for (let attempts = 0; attempts < MAX_ATTEMPTS; attempts++) {
try {
Expand All @@ -216,7 +218,7 @@ async function retryingTransport<TResponse>(...args: Parameters<RpcTransport>):
Here’s an example of some failover logic integrated into a transport:

```ts
import { RpcTransport } from '@solana/rpc-spec';
import { RpcResponse, RpcTransport } from '@solana/rpc-spec';
import { createHttpTransport } from '@solana/rpc-transport-http';

// Create a transport for each RPC server
Expand All @@ -227,7 +229,7 @@ const transports = [
];

// A failover transport that will try each transport in order until one succeeds before failing
async function failoverTransport<TResponse>(...args: Parameters<RpcTransport>): Promise<TResponse> {
async function failoverTransport<TResponse>(...args: Parameters<RpcTransport>): Promise<RpcResponse<TResponse>> {
let requestError;

for (const transport of transports) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,13 @@ describe('createHttpTransport and `AbortSignal`', () => {
it('resolves with the response', async () => {
expect.assertions(1);
jest.mocked(fetchSpy).mockResolvedValueOnce({
json: () => ({ ok: true }),
json: () => Promise.resolve({ ok: true }),
ok: true,
} as unknown as Response);
const sendPromise = makeHttpRequest({ payload: 123, signal: abortSignal });
abortController.abort('I got bored waiting');
await expect(sendPromise).resolves.toMatchObject({
const response = await sendPromise;
await expect(response.json()).resolves.toMatchObject({
ok: true,
});
});
Expand Down
9 changes: 6 additions & 3 deletions packages/rpc-transport-http/src/http-transport.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { SOLANA_ERROR__RPC__TRANSPORT_HTTP_ERROR, SolanaError } from '@solana/errors';
import { RpcTransport } from '@solana/rpc-spec';
import { RpcResponse, RpcTransport } from '@solana/rpc-spec';
import type Dispatcher from 'undici-types/dispatcher';

import {
Expand Down Expand Up @@ -44,7 +44,7 @@ export function createHttpTransport(config: Config): RpcTransport {
return async function makeHttpRequest<TResponse>({
payload,
signal,
}: Parameters<RpcTransport>[0]): Promise<TResponse> {
}: Parameters<RpcTransport>[0]): Promise<RpcResponse<TResponse>> {
const body = JSON.stringify(payload);
const requestInfo = {
...dispatcherConfig,
Expand All @@ -66,6 +66,9 @@ export function createHttpTransport(config: Config): RpcTransport {
statusCode: response.status,
});
}
return (await response.json()) as TResponse;
return Object.freeze({
json: () => response.json(),
text: () => response.text(),
});
};
}
23 changes: 15 additions & 8 deletions packages/rpc/src/__tests__/rpc-request-coalescer-test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
import type { RpcTransport } from '@solana/rpc-spec';
import type { RpcResponse, RpcTransport } from '@solana/rpc-spec';

import { getRpcTransportWithRequestCoalescing } from '../rpc-request-coalescer';

function createMockResponse<T>(jsonResponse: T): RpcResponse<T> {
return {
json: () => Promise.resolve(jsonResponse),
text: () => Promise.resolve(JSON.stringify(jsonResponse)),
};
}

describe('RPC request coalescer', () => {
let coalescedTransport: RpcTransport;
let hashFn: jest.MockedFunction<() => string | undefined>;
Expand Down Expand Up @@ -30,7 +37,7 @@ describe('RPC request coalescer', () => {
});
it('multiple requests in the same tick receive the same response', async () => {
expect.assertions(2);
const mockResponse = { response: 'ok' };
const mockResponse = createMockResponse({ response: 'ok' });
mockTransport.mockResolvedValueOnce(mockResponse);
const responsePromiseA = coalescedTransport({ payload: null });
const responsePromiseB = coalescedTransport({ payload: null });
Expand All @@ -41,8 +48,8 @@ describe('RPC request coalescer', () => {
});
it('multiple requests in different ticks receive different responses', async () => {
expect.assertions(2);
const mockResponseA = { response: 'okA' };
const mockResponseB = { response: 'okB' };
const mockResponseA = createMockResponse({ response: 'okA' });
const mockResponseB = createMockResponse({ response: 'okB' });
mockTransport.mockResolvedValueOnce(mockResponseA);
mockTransport.mockResolvedValueOnce(mockResponseB);
const responsePromiseA = coalescedTransport({ payload: null });
Expand Down Expand Up @@ -100,7 +107,7 @@ describe('RPC request coalescer', () => {
let abortControllerB: AbortController;
let responsePromiseA: ReturnType<typeof mockTransport>;
let responsePromiseB: ReturnType<typeof mockTransport>;
let transportResponsePromise: (value: unknown) => void;
let transportResponsePromise: (value: RpcResponse<unknown>) => void;
beforeEach(() => {
abortControllerA = new AbortController();
abortControllerB = new AbortController();
Expand Down Expand Up @@ -157,7 +164,7 @@ describe('RPC request coalescer', () => {
it('delivers responses to all but the aborted requests', async () => {
expect.assertions(2);
abortControllerA.abort('o no A');
const mockResponse = { response: 'ok' };
const mockResponse = createMockResponse({ response: 'ok' });
transportResponsePromise(mockResponse);
await Promise.all([
expect(responsePromiseA).rejects.toBe('o no A'),
Expand Down Expand Up @@ -192,8 +199,8 @@ describe('RPC request coalescer', () => {
});
it('multiple requests in the same tick receive different responses', async () => {
expect.assertions(2);
const mockResponseA = { response: 'okA' };
const mockResponseB = { response: 'okB' };
const mockResponseA = createMockResponse({ response: 'okA' });
const mockResponseB = createMockResponse({ response: 'okB' });
mockTransport.mockResolvedValueOnce(mockResponseA);
mockTransport.mockResolvedValueOnce(mockResponseB);
const responsePromiseA = coalescedTransport({ payload: null });
Expand Down
20 changes: 11 additions & 9 deletions packages/rpc/src/rpc-request-coalescer.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import type { RpcTransport } from '@solana/rpc-spec';
import type { RpcResponse, RpcTransport } from '@solana/rpc-spec';

type CoalescedRequest = {
readonly abortController: AbortController;
numConsumers: number;
readonly responsePromise: Promise<unknown>;
readonly responsePromise: Promise<RpcResponse | undefined>;
};

type GetDeduplicationKeyFn = (payload: unknown) => string | undefined;
Expand All @@ -30,11 +30,13 @@ export function getRpcTransportWithRequestCoalescing<TTransport extends RpcTrans
getDeduplicationKey: GetDeduplicationKeyFn,
): TTransport {
let coalescedRequestsByDeduplicationKey: Record<string, CoalescedRequest> | undefined;
return async function makeCoalescedHttpRequest<TResponse>(config: Parameters<RpcTransport>[0]): Promise<TResponse> {
const { payload, signal } = config;
return async function makeCoalescedHttpRequest<TResponse>(
request: Parameters<RpcTransport>[0],
): Promise<RpcResponse<TResponse>> {
const { payload, signal } = request;
const deduplicationKey = getDeduplicationKey(payload);
if (deduplicationKey === undefined) {
return await transport(config);
return await transport(request);
}
if (!coalescedRequestsByDeduplicationKey) {
Promise.resolve().then(() => {
Expand All @@ -47,7 +49,7 @@ export function getRpcTransportWithRequestCoalescing<TTransport extends RpcTrans
const responsePromise = (async () => {
try {
return await transport<TResponse>({
...config,
...request,
signal: abortController.signal,
});
} catch (e) {
Expand All @@ -69,8 +71,8 @@ export function getRpcTransportWithRequestCoalescing<TTransport extends RpcTrans
const coalescedRequest = coalescedRequestsByDeduplicationKey[deduplicationKey];
coalescedRequest.numConsumers++;
if (signal) {
const responsePromise = coalescedRequest.responsePromise as Promise<TResponse>;
return await new Promise<TResponse>((resolve, reject) => {
const responsePromise = coalescedRequest.responsePromise as Promise<RpcResponse<TResponse>>;
return await new Promise<RpcResponse<TResponse>>((resolve, reject) => {
const handleAbort = (e: AbortSignalEventMap['abort']) => {
signal.removeEventListener('abort', handleAbort);
coalescedRequest.numConsumers -= 1;
Expand All @@ -91,7 +93,7 @@ export function getRpcTransportWithRequestCoalescing<TTransport extends RpcTrans
});
});
} else {
return (await coalescedRequest.responsePromise) as TResponse;
return (await coalescedRequest.responsePromise) as RpcResponse<TResponse>;
}
} as TTransport;
}

0 comments on commit e1cb697

Please sign in to comment.