Skip to content
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions configs/vitest.config.e2e.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ export const e2eProject = defineProject({
singleFork: true,
},
},
sequence: {
concurrent: false,
shuffle: false,
},
Comment on lines +22 to +25
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not a behavior change, but will help to make sure that behavior does not change unintentionally in future.

},
});

Expand All @@ -41,5 +45,9 @@ export const e2eMainnetProject = defineProject({
singleFork: true,
},
},
sequence: {
concurrent: false,
shuffle: false,
},
},
});
2 changes: 1 addition & 1 deletion packages/beacon-node/src/network/libp2p/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ export async function createNodeJsLibp2p(
},
}),
],
streamMuxers: [mplex({maxInboundStreams: 256})],
streamMuxers: [mplex({maxInboundStreams: 256, disconnectThreshold: networkOpts.disconnectThreshold})],
peerDiscovery,
metrics: nodeJsLibp2pOpts.metrics
? prometheusMetrics({
Expand Down
2 changes: 1 addition & 1 deletion packages/beacon-node/src/network/network.ts
Original file line number Diff line number Diff line change
Expand Up @@ -221,10 +221,10 @@ export class Network implements INetwork {
this.chain.emitter.off(routes.events.EventType.lightClientFinalityUpdate, this.onLightClientFinalityUpdate);
this.chain.emitter.off(routes.events.EventType.lightClientOptimisticUpdate, this.onLightClientOptimisticUpdate);
await this.core.close();
this.logger.debug("network core closed");

// Used only for sleep() statements
this.controller.abort();
this.logger.debug("network core closed");
}

async scrapeMetrics(): Promise<string> {
Expand Down
19 changes: 19 additions & 0 deletions packages/beacon-node/src/network/options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,25 @@ export interface NetworkOptions
useWorker?: boolean;
maxYoungGenerationSizeMb?: number;
disableLightClientServer?: boolean;

/**
* During E2E tests observe a lot of following `missing stream`:
*
* > libp2p:mplex receiver stream with id 2 and protocol /eth2/beacon_chain/req/metadata/2/ssz_snappy ended
* > libp2p:mplex initiator stream with id 4 and protocol /eth2/beacon_chain/req/metadata/2/ssz_snappy ended
* > libp2p:mplex initiator stream with id 2 and protocol /eth2/beacon_chain/req/metadata/2/ssz_snappy ended
* > libp2p:mplex missing stream 2 for message type CLOSE_INITIATOR
* > libp2p:mplex missing stream 2 for message type CLOSE_RECEIVER
* > libp2p:mplex missing stream 4 for message type CLOSE_INITIATOR
*
* which results in following rate-limit error and cause the connection to close and fail the e2e tests
* > libp2p:mplex rate limit hit when receiving messages for streams that do not exist - closing remote connection
* > libp2p:mplex:stream:initiator:3 abort with error Error: Too many messages for missing streams
*
* The default value for `disconnectThreshold` in libp2p is set to `5`.
* We need to increase this only for the testing purpose
*/
disconnectThreshold?: number;
}

export const defaultNetworkOptions: NetworkOptions = {
Expand Down
51 changes: 28 additions & 23 deletions packages/beacon-node/test/e2e/network/reqresp.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import {chainConfig} from "@lodestar/config/default";
import {ForkName} from "@lodestar/params";
import {RequestError, RequestErrorCode, ResponseOutgoing} from "@lodestar/reqresp";
import {Root, SignedBeaconBlock, altair, phase0, ssz} from "@lodestar/types";
import {sleep as _sleep} from "@lodestar/utils";
import {sleep as _sleep, sleep} from "@lodestar/utils";
import {afterEach, beforeEach, describe, expect, it, vi} from "vitest";
import {Network, ReqRespBeaconNodeOpts} from "../../../src/network/index.js";
import {GetReqRespHandlerFn, ReqRespMethod} from "../../../src/network/reqresp/types.js";
Expand Down Expand Up @@ -33,24 +33,21 @@ function runTests({useWorker}: {useWorker: boolean}): void {
...chainConfig,
ALTAIR_FORK_EPOCH: 0,
});
let controller: AbortController;

const afterEachCallbacks: (() => Promise<void> | void)[] = [];

beforeEach(() => {
controller = new AbortController();
});
Copy link
Member

@nflaig nflaig May 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it make sense to abort the controller in afterEach?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had it before but does not make sense of it with these changes. Mostly it's used for sleep and peer connection, if any of those hangs up the test will already timeout.


afterEach(async () => {
while (afterEachCallbacks.length > 0) {
const callback = afterEachCallbacks.pop();
if (callback) await callback();
}
});

let controller: AbortController;
beforeEach(() => {
controller = new AbortController();
});
afterEach(() => controller.abort());
async function sleep(ms: number): Promise<void> {
await _sleep(ms, controller.signal);
}

async function createAndConnectPeers(
getReqRespHandler?: GetReqRespHandlerFn,
opts?: ReqRespBeaconNodeOpts
Expand All @@ -68,11 +65,15 @@ function runTests({useWorker}: {useWorker: boolean}): void {
await closeA();
await closeB();
});

const connected = Promise.all([onPeerConnect(netA), onPeerConnect(netB)]);
await connect(netA, netB);
await connect(netA, netB, controller.signal);
await connected;

controller.signal.addEventListener("abort", async () => {
await closeA();
await closeB();
});

return [netA, netB, await getPeerIdOf(netA), await getPeerIdOf(netB)];
}

Expand Down Expand Up @@ -242,15 +243,15 @@ function runTests({useWorker}: {useWorker: boolean}): void {
);
});

it("trigger a TTFB_TIMEOUT error", async () => {
it("should trigger TTFB_TIMEOUT error if first response is delayed", async () => {
const ttfbTimeoutMs = 250;

const [netA, _, _0, peerIdB] = await createAndConnectPeers(
(method) =>
async function* onRequest() {
if (method === ReqRespMethod.BeaconBlocksByRange) {
// Wait for too long before sending first response chunk
await sleep(ttfbTimeoutMs * 10);
await sleep(ttfbTimeoutMs * 10, controller.signal);
yield wrapBlockAsEncodedPayload(config, config.getForkTypes(0).SignedBeaconBlock.defaultValue());
}
},
Expand All @@ -263,20 +264,21 @@ function runTests({useWorker}: {useWorker: boolean}): void {
);
});

it("trigger a RESP_TIMEOUT error", async () => {
const respTimeoutMs = 250;
it("should trigger a RESP_TIMEOUT error if first byte is on time but later delayed", async () => {
const ttfbTimeoutMs = 250;
const respTimeoutMs = 300;

const [netA, _, _0, peerIdB] = await createAndConnectPeers(
(method) =>
async function* onRequest() {
if (method === ReqRespMethod.BeaconBlocksByRange) {
yield getEmptyEncodedPayloadSignedBeaconBlock(config);
// Wait for too long before sending second response chunk
await sleep(respTimeoutMs * 5);
await sleep(respTimeoutMs * 5, controller.signal);
yield getEmptyEncodedPayloadSignedBeaconBlock(config);
}
},
{respTimeoutMs}
{ttfbTimeoutMs, respTimeoutMs}
);

await expectRejectedWithLodestarError(
Expand All @@ -285,16 +287,19 @@ function runTests({useWorker}: {useWorker: boolean}): void {
);
});

it("Sleep infinite on first byte", async () => {
it("should trigger TTFB_TIMEOUT error if respTimeoutMs and ttfbTimeoutMs is the same", async () => {
const ttfbTimeoutMs = 250;
const respTimeoutMs = 250;

const [netA, _, _0, peerIdB] = await createAndConnectPeers(
(method) =>
// biome-ignore lint/correctness/useYield: No need for yield in test context
async function* onRequest() {
if (method === ReqRespMethod.BeaconBlocksByRange) {
await sleep(100000000);
await sleep(100000000, controller.signal);
}
},
{respTimeoutMs: 250, ttfbTimeoutMs: 250}
{respTimeoutMs, ttfbTimeoutMs}
);

await expectRejectedWithLodestarError(
Expand All @@ -303,13 +308,13 @@ function runTests({useWorker}: {useWorker: boolean}): void {
);
});

it("Sleep infinite on second response chunk", async () => {
it("should trigger a RESP_TIMEOUT error if first byte is on time but sleep infinite", async () => {
const [netA, _, _0, peerIdB] = await createAndConnectPeers(
(method) =>
async function* onRequest() {
if (method === ReqRespMethod.BeaconBlocksByRange) {
yield getEmptyEncodedPayloadSignedBeaconBlock(config);
await sleep(100000000);
await sleep(100000000, controller.signal);
}
},
{respTimeoutMs: 250, ttfbTimeoutMs: 250}
Expand Down
7 changes: 6 additions & 1 deletion packages/beacon-node/test/utils/network.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {generateKeyPair} from "@libp2p/crypto/keys";
import {PrivateKey} from "@libp2p/interface";
import {ATTESTATION_SUBNET_COUNT, SYNC_COMMITTEE_SUBNET_COUNT} from "@lodestar/params";
import {SubnetID} from "@lodestar/types";
import {sleep} from "@lodestar/utils";
import {INetwork, Network, NetworkEvent} from "../../src/network/index.js";
import {Libp2p} from "../../src/network/interface.js";
import {createNodeJsLibp2p} from "../../src/network/libp2p/index.js";
Expand Down Expand Up @@ -35,9 +36,13 @@ type INetworkDebug = Pick<INetwork, "connectToPeer" | "disconnectPeer" | "getNet

// Helpers to manipulate network's libp2p instance for testing only

export async function connect(netDial: INetworkDebug, netServer: INetworkDebug): Promise<void> {
export async function connect(netDial: INetworkDebug, netServer: INetworkDebug, signal?: AbortSignal): Promise<void> {
const netServerId = await netServer.getNetworkIdentity();
await netDial.connectToPeer(netServerId.peerId, netServerId.p2pAddresses);

// We see a lot of "Muxer already closed" in e2e tests on CI
// This is a way to give a grace period for connections to open and exchange metadata
await sleep(50, signal);
}

export async function disconnect(network: INetworkDebug, peer: string): Promise<void> {
Expand Down
16 changes: 14 additions & 2 deletions packages/beacon-node/test/utils/networkWithMockDb.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import {generateKeyPair} from "@libp2p/crypto/keys";
import {ChainForkConfig, createBeaconConfig} from "@lodestar/config";
import {ssz} from "@lodestar/types";
import {sleep} from "@lodestar/utils";
import {BeaconChain} from "../../src/chain/chain.js";
import {Eth1ForBlockProductionDisabled} from "../../src/eth1/index.js";
import {ExecutionEngineDisabled} from "../../src/execution/index.js";
Expand Down Expand Up @@ -89,15 +90,18 @@ export async function getNetworkForTest(
privateKey: await generateKeyPair("secp256k1"),
opts: {
...defaultNetworkOptions,
maxPeers: 1,
maxPeers: 10,
targetPeers: 1,
bootMultiaddrs: [],
localMultiaddrs: ["/ip4/127.0.0.1/tcp/0"],
localMultiaddrs: ["/ip4/0.0.0.0/tcp/0"],
discv5FirstQueryDelayMs: 0,
discv5: null,
skipParamsLog: true,
// Disable rate limiting
rateLimitMultiplier: 0,
// Increase of following value is just to circumvent the following error in e2e tests
// > libp2p:mplex rate limit hit when receiving messages
disconnectThreshold: 255,
...opts.opts,
},
logger,
Expand All @@ -108,6 +112,14 @@ export async function getNetworkForTest(
async function closeAll() {
await network.close();
await chain.close();

/**
* We choose random port for the libp2p network. Though our libp2p instance is closed the
* system still hold the port momentarily. And if next test randomly select the same port
* it failed with ERR_CONNECTION_REFUSED. To avoid such situation giving a grace period
* for the system to also cleanup resources.
*/
await sleep(100);
},
];
}
21 changes: 13 additions & 8 deletions packages/params/test/e2e/ensure-config-is-synced.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,19 @@ function assertCorrectPreset(localPreset: BeaconPreset, remotePreset: BeaconPres
}

async function downloadRemoteConfig(preset: "mainnet" | "minimal", commit: string): Promise<BeaconPreset> {
const downloadedParams = await Promise.all(
Object.values(ForkName).map((forkName) =>
axios({
url: `https://raw.githubusercontent.com/ethereum/consensus-specs/${commit}/presets/${preset}/${forkName}.yaml`,
timeout: 30 * 1000,
}).then((response) => loadConfigYaml(response.data))
)
);
const downloadedParams: Record<string, unknown>[] = [];

for (const forkName of Object.values(ForkName)) {
const response = await axios({
url: `https://raw.githubusercontent.com/ethereum/consensus-specs/${commit}/presets/${preset}/${forkName}.yaml`,
timeout: 30 * 1000,
});
downloadedParams.push(loadConfigYaml(response.data));

// We get error `Request failed with status code 429`
// which is `Too Many Request` so we added a bit delay between each request
await new Promise((resolve) => setTimeout(resolve, 200));
}

// Merge all the fetched yamls for the different forks
const beaconPresetRaw: Record<string, unknown> = Object.assign(
Expand Down
Loading