Skip to content

Commit

Permalink
Update RPC transports to use new types
Browse files Browse the repository at this point in the history
  • Loading branch information
lorisleiva committed Aug 23, 2024
1 parent e236173 commit 93d3a29
Show file tree
Hide file tree
Showing 8 changed files with 65 additions and 36 deletions.
11 changes: 9 additions & 2 deletions packages/rpc-api/src/__tests__/get-health-test.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
import { SOLANA_ERROR__JSON_RPC__SERVER_ERROR_NODE_UNHEALTHY, SolanaError } from '@solana/errors';
import { createRpc, type Rpc } from '@solana/rpc-spec';
import { createRpc, type Rpc, type RpcResponse } from '@solana/rpc-spec';

import { createSolanaRpcApi, GetHealthApi } from '../index';
import { createLocalhostSolanaRpc } from './__setup__';

function createMockResponse<T>(jsonResponse: T): RpcResponse<T> {
return {
json: () => Promise.resolve(jsonResponse),
text: () => Promise.resolve(JSON.stringify(jsonResponse)),
};
}

describe('getHealth', () => {
describe('when the node is healthy', () => {
let rpc: Rpc<GetHealthApi>;
Expand All @@ -29,7 +36,7 @@ describe('getHealth', () => {
beforeEach(() => {
rpc = createRpc({
api: createSolanaRpcApi(),
transport: jest.fn().mockResolvedValue({ error: errorObject }),
transport: jest.fn().mockResolvedValue(createMockResponse({ error: errorObject })),
});
});
it('returns an error message', async () => {
Expand Down
14 changes: 11 additions & 3 deletions packages/rpc-spec/src/__tests__/rpc-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,20 @@ import { createRpcMessage } from '@solana/rpc-spec-types';
import { createRpc, Rpc } from '../rpc';
import { RpcApi } from '../rpc-api';
import { PendingRpcApiRequest } from '../rpc-request';
import { RpcResponse } from '../rpc-shared';
import { RpcTransport } from '../rpc-transport';

interface TestRpcMethods {
someMethod(...args: unknown[]): unknown;
}

function createMockResponse<T>(jsonResponse: T): RpcResponse<T> {
return {
json: () => Promise.resolve(jsonResponse),
text: () => Promise.resolve(JSON.stringify(jsonResponse)),
};
}

describe('JSON-RPC 2.0', () => {
let makeHttpRequest: RpcTransport;
let rpc: Rpc<TestRpcMethods>;
Expand All @@ -34,7 +42,7 @@ describe('JSON-RPC 2.0', () => {
});
it('returns results from the transport', async () => {
expect.assertions(1);
(makeHttpRequest as jest.Mock).mockResolvedValueOnce(123);
(makeHttpRequest as jest.Mock).mockResolvedValueOnce(createMockResponse(123));
const result = await rpc.someMethod().send();
expect(result).toBe(123);
});
Expand Down Expand Up @@ -90,13 +98,13 @@ describe('JSON-RPC 2.0', () => {
});
it('calls the response transformer with the response from the JSON-RPC 2.0 endpoint', async () => {
expect.assertions(1);
(makeHttpRequest as jest.Mock).mockResolvedValueOnce(123);
(makeHttpRequest as jest.Mock).mockResolvedValueOnce(createMockResponse(123));
await rpc.someMethod().send();
expect(responseTransformer).toHaveBeenCalledWith(123, 'someMethod');
});
it('returns the processed response', async () => {
expect.assertions(1);
(makeHttpRequest as jest.Mock).mockResolvedValueOnce(123);
(makeHttpRequest as jest.Mock).mockResolvedValueOnce(createMockResponse(123));
const result = await rpc.someMethod().send();
expect(result).toBe('123 processed response');
});
Expand Down
10 changes: 6 additions & 4 deletions packages/rpc-spec/src/rpc-transport.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
type RpcTransportConfig = Readonly<{
import { RpcResponse } from './rpc-shared';

type RpcTransportRequest = Readonly<{
payload: unknown;
signal?: AbortSignal;
}>;

export interface RpcTransport {
<TResponse>(config: RpcTransportConfig): Promise<TResponse>;
}
export type RpcTransport = {
<TResponse>(request: RpcTransportRequest): Promise<RpcResponse<TResponse>>;
};
9 changes: 4 additions & 5 deletions packages/rpc-spec/src/rpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import {
createRpcMessage,
Flatten,
OverloadImplementations,
RpcResponseData,
UnionToIntersection,
} from '@solana/rpc-spec-types';

Expand Down Expand Up @@ -68,12 +67,12 @@ function createPendingRpcRequest<TRpcMethods, TRpcTransport extends RpcTransport
return {
async send(options?: RpcSendOptions): Promise<TResponse> {
const { methodName, params, responseTransformer } = pendingRequest;
const payload = createRpcMessage(methodName, params);
const response = await rpcConfig.transport<RpcResponseData<unknown>>({
payload,
const response = await rpcConfig.transport<TResponse>({
payload: createRpcMessage(methodName, params),
signal: options?.abortSignal,
});
return (responseTransformer ? responseTransformer(response, methodName) : response) as TResponse;
const responseData = await response.json();
return responseTransformer ? responseTransformer(responseData, methodName) : responseData;
},
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,13 @@ describe('createHttpTransport and `AbortSignal`', () => {
it('resolves with the response', async () => {
expect.assertions(1);
jest.mocked(fetchSpy).mockResolvedValueOnce({
json: () => ({ ok: true }),
json: () => Promise.resolve({ ok: true }),
ok: true,
} as unknown as Response);
const sendPromise = makeHttpRequest({ payload: 123, signal: abortSignal });
abortController.abort('I got bored waiting');
await expect(sendPromise).resolves.toMatchObject({
const response = await sendPromise;
await expect(response.json()).resolves.toMatchObject({
ok: true,
});
});
Expand Down
9 changes: 6 additions & 3 deletions packages/rpc-transport-http/src/http-transport.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { SOLANA_ERROR__RPC__TRANSPORT_HTTP_ERROR, SolanaError } from '@solana/errors';
import { RpcTransport } from '@solana/rpc-spec';
import { RpcResponse, RpcTransport } from '@solana/rpc-spec';
import type Dispatcher from 'undici-types/dispatcher';

import {
Expand Down Expand Up @@ -44,7 +44,7 @@ export function createHttpTransport(config: Config): RpcTransport {
return async function makeHttpRequest<TResponse>({
payload,
signal,
}: Parameters<RpcTransport>[0]): Promise<TResponse> {
}: Parameters<RpcTransport>[0]): Promise<RpcResponse<TResponse>> {
const body = JSON.stringify(payload);
const requestInfo = {
...dispatcherConfig,
Expand All @@ -66,6 +66,9 @@ export function createHttpTransport(config: Config): RpcTransport {
statusCode: response.status,
});
}
return (await response.json()) as TResponse;
return Object.freeze({
json: () => response.json(),
text: () => response.text(),
});
};
}
23 changes: 15 additions & 8 deletions packages/rpc/src/__tests__/rpc-request-coalescer-test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
import type { RpcTransport } from '@solana/rpc-spec';
import type { RpcResponse, RpcTransport } from '@solana/rpc-spec';

import { getRpcTransportWithRequestCoalescing } from '../rpc-request-coalescer';

function createMockResponse<T>(jsonResponse: T): RpcResponse<T> {
return {
json: () => Promise.resolve(jsonResponse),
text: () => Promise.resolve(JSON.stringify(jsonResponse)),
};
}

describe('RPC request coalescer', () => {
let coalescedTransport: RpcTransport;
let hashFn: jest.MockedFunction<() => string | undefined>;
Expand Down Expand Up @@ -30,7 +37,7 @@ describe('RPC request coalescer', () => {
});
it('multiple requests in the same tick receive the same response', async () => {
expect.assertions(2);
const mockResponse = { response: 'ok' };
const mockResponse = createMockResponse({ response: 'ok' });
mockTransport.mockResolvedValueOnce(mockResponse);
const responsePromiseA = coalescedTransport({ payload: null });
const responsePromiseB = coalescedTransport({ payload: null });
Expand All @@ -41,8 +48,8 @@ describe('RPC request coalescer', () => {
});
it('multiple requests in different ticks receive different responses', async () => {
expect.assertions(2);
const mockResponseA = { response: 'okA' };
const mockResponseB = { response: 'okB' };
const mockResponseA = createMockResponse({ response: 'okA' });
const mockResponseB = createMockResponse({ response: 'okB' });
mockTransport.mockResolvedValueOnce(mockResponseA);
mockTransport.mockResolvedValueOnce(mockResponseB);
const responsePromiseA = coalescedTransport({ payload: null });
Expand Down Expand Up @@ -100,7 +107,7 @@ describe('RPC request coalescer', () => {
let abortControllerB: AbortController;
let responsePromiseA: ReturnType<typeof mockTransport>;
let responsePromiseB: ReturnType<typeof mockTransport>;
let transportResponsePromise: (value: unknown) => void;
let transportResponsePromise: (value: RpcResponse<unknown>) => void;
beforeEach(() => {
abortControllerA = new AbortController();
abortControllerB = new AbortController();
Expand Down Expand Up @@ -157,7 +164,7 @@ describe('RPC request coalescer', () => {
it('delivers responses to all but the aborted requests', async () => {
expect.assertions(2);
abortControllerA.abort('o no A');
const mockResponse = { response: 'ok' };
const mockResponse = createMockResponse({ response: 'ok' });
transportResponsePromise(mockResponse);
await Promise.all([
expect(responsePromiseA).rejects.toBe('o no A'),
Expand Down Expand Up @@ -192,8 +199,8 @@ describe('RPC request coalescer', () => {
});
it('multiple requests in the same tick receive different responses', async () => {
expect.assertions(2);
const mockResponseA = { response: 'okA' };
const mockResponseB = { response: 'okB' };
const mockResponseA = createMockResponse({ response: 'okA' });
const mockResponseB = createMockResponse({ response: 'okB' });
mockTransport.mockResolvedValueOnce(mockResponseA);
mockTransport.mockResolvedValueOnce(mockResponseB);
const responsePromiseA = coalescedTransport({ payload: null });
Expand Down
20 changes: 11 additions & 9 deletions packages/rpc/src/rpc-request-coalescer.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import type { RpcTransport } from '@solana/rpc-spec';
import type { RpcResponse, RpcTransport } from '@solana/rpc-spec';

type CoalescedRequest = {
readonly abortController: AbortController;
numConsumers: number;
readonly responsePromise: Promise<unknown>;
readonly responsePromise: Promise<RpcResponse | undefined>;
};

type GetDeduplicationKeyFn = (payload: unknown) => string | undefined;
Expand All @@ -30,11 +30,13 @@ export function getRpcTransportWithRequestCoalescing<TTransport extends RpcTrans
getDeduplicationKey: GetDeduplicationKeyFn,
): TTransport {
let coalescedRequestsByDeduplicationKey: Record<string, CoalescedRequest> | undefined;
return async function makeCoalescedHttpRequest<TResponse>(config: Parameters<RpcTransport>[0]): Promise<TResponse> {
const { payload, signal } = config;
return async function makeCoalescedHttpRequest<TResponse>(
request: Parameters<RpcTransport>[0],
): Promise<RpcResponse<TResponse>> {
const { payload, signal } = request;
const deduplicationKey = getDeduplicationKey(payload);
if (deduplicationKey === undefined) {
return await transport(config);
return await transport(request);
}
if (!coalescedRequestsByDeduplicationKey) {
Promise.resolve().then(() => {
Expand All @@ -47,7 +49,7 @@ export function getRpcTransportWithRequestCoalescing<TTransport extends RpcTrans
const responsePromise = (async () => {
try {
return await transport<TResponse>({
...config,
...request,
signal: abortController.signal,
});
} catch (e) {
Expand All @@ -69,8 +71,8 @@ export function getRpcTransportWithRequestCoalescing<TTransport extends RpcTrans
const coalescedRequest = coalescedRequestsByDeduplicationKey[deduplicationKey];
coalescedRequest.numConsumers++;
if (signal) {
const responsePromise = coalescedRequest.responsePromise as Promise<TResponse>;
return await new Promise<TResponse>((resolve, reject) => {
const responsePromise = coalescedRequest.responsePromise as Promise<RpcResponse<TResponse>>;
return await new Promise<RpcResponse<TResponse>>((resolve, reject) => {
const handleAbort = (e: AbortSignalEventMap['abort']) => {
signal.removeEventListener('abort', handleAbort);
coalescedRequest.numConsumers -= 1;
Expand All @@ -91,7 +93,7 @@ export function getRpcTransportWithRequestCoalescing<TTransport extends RpcTrans
});
});
} else {
return (await coalescedRequest.responsePromise) as TResponse;
return (await coalescedRequest.responsePromise) as RpcResponse<TResponse>;
}
} as TTransport;
}

0 comments on commit 93d3a29

Please sign in to comment.