Skip to content
Merged
6 changes: 6 additions & 0 deletions packages/api/src/beacon/routes/lodestar.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,12 @@ export type StateCacheItem = {

export type LodestarNodePeer = NodePeer & {
agentVersion: string;
status: unknown | null;
metadata: unknown | null;
agentClient: string;
lastReceivedMsgUnixTsMs: number;
lastStatusUnixTsMs: number;
connectedUnixTsMs: number;
};

export type BlacklistedBlock = {root: RootHex; slot: Slot | null};
Expand Down
4 changes: 4 additions & 0 deletions packages/beacon-node/src/network/core/metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ export function createNetworkCoreMetrics(register: RegistryMetricCreator) {
help: "Peer manager heartbeat function duration in seconds",
buckets: [0.001, 0.01, 0.1, 1],
}),
starved: register.gauge({
name: "lodestar_peer_manager_starved_bool",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you have time would be good to add a panel somewhere that uses the bool

help: "Whether lodestar is starved of data while syncing",
}),
},
leakedConnectionsCount: register.gauge({
name: "lodestar_peer_manager_leaked_connections_count",
Expand Down
27 changes: 19 additions & 8 deletions packages/beacon-node/src/network/core/networkCore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import {BeaconConfig} from "@lodestar/config";
import type {LoggerNode} from "@lodestar/logger/node";
import {ForkName} from "@lodestar/params";
import {ResponseIncoming} from "@lodestar/reqresp";
import {Epoch, phase0} from "@lodestar/types";
import {Epoch, phase0, ssz} from "@lodestar/types";
import {fromHex} from "@lodestar/utils";
import {multiaddr} from "@multiformats/multiaddr";
import {formatNodePeer} from "../../api/impl/node/utils.js";
Expand Down Expand Up @@ -387,18 +387,29 @@ export class NetworkCore implements INetworkCore {
await this.libp2p.hangUp(peerIdFromString(peerIdStr));
}

private _dumpPeer(peerIdStr: string, connections: Connection[]): routes.lodestar.LodestarNodePeer {
const peerData = this.peersData.connectedPeers.get(peerIdStr);
return {
...formatNodePeer(peerIdStr, connections),
agentVersion: peerData?.agentVersion ?? "NA",
status: peerData?.status ? ssz.phase0.Status.toJson(peerData.status) : null,
metadata: peerData?.metadata ? ssz.altair.Metadata.toJson(peerData.metadata) : null,
Copy link
Member

@nflaig nflaig May 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

once Fusaka goes live we should update this to use fulu.Metadata, or just if-else based on clock slot but we need the type first #7774

agentClient: String(peerData?.agentClient ?? "Unknown"),
lastReceivedMsgUnixTsMs: peerData?.lastReceivedMsgUnixTsMs ?? 0,
lastStatusUnixTsMs: peerData?.lastStatusUnixTsMs ?? 0,
connectedUnixTsMs: peerData?.connectedUnixTsMs ?? 0,
};
}

async dumpPeer(peerIdStr: string): Promise<routes.lodestar.LodestarNodePeer | undefined> {
const connections = this.getConnectionsByPeer().get(peerIdStr);
return connections
? {...formatNodePeer(peerIdStr, connections), agentVersion: this.peersData.getAgentVersion(peerIdStr)}
: undefined;
return connections ? this._dumpPeer(peerIdStr, connections) : undefined;
}

async dumpPeers(): Promise<routes.lodestar.LodestarNodePeer[]> {
return Array.from(this.getConnectionsByPeer().entries()).map(([peerIdStr, connections]) => ({
...formatNodePeer(peerIdStr, connections),
agentVersion: this.peersData.getAgentVersion(peerIdStr),
}));
return Array.from(this.getConnectionsByPeer().entries()).map(([peerIdStr, connections]) =>
this._dumpPeer(peerIdStr, connections)
);
}

async dumpPeerScoreStats(): Promise<PeerScoreStats> {
Expand Down
34 changes: 31 additions & 3 deletions packages/beacon-node/src/network/peers/peerManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import {BitArray} from "@chainsafe/ssz";
import {Connection, PeerId, PrivateKey} from "@libp2p/interface";
import {BeaconConfig} from "@lodestar/config";
import {LoggerNode} from "@lodestar/logger/node";
import {SYNC_COMMITTEE_SUBNET_COUNT} from "@lodestar/params";
import {SLOTS_PER_EPOCH, SYNC_COMMITTEE_SUBNET_COUNT} from "@lodestar/params";
import {Metadata, altair, phase0} from "@lodestar/types";
import {withTimeout} from "@lodestar/utils";
import {GOODBYE_KNOWN_CODES, GoodByeReasonCode, Libp2pEvent} from "../../constants/index.js";
Expand Down Expand Up @@ -53,6 +53,11 @@ const PEER_RELEVANT_TAG = "relevant";
/** Tag value of PEER_RELEVANT_TAG */
const PEER_RELEVANT_TAG_VALUE = 100;

/** Change pruning behavior once the head falls behind */
const STARVATION_THRESHOLD_SLOTS = SLOTS_PER_EPOCH * 2;
/** Percentage of peers to attempt to prune when starvation threshold is met */
const STARVATION_PRUNE_RATIO = 0.05;

/**
* Relative factor of peers that are allowed to have a negative gossipsub score without penalizing them in lodestar.
*/
Expand Down Expand Up @@ -141,6 +146,7 @@ export class PeerManager {
private readonly discovery: PeerDiscovery | null;
private readonly networkEventBus: INetworkEventBus;
private readonly statusCache: StatusCache;
private lastStatus: phase0.Status;

// A single map of connected peers with all necessary data to handle PINGs, STATUS, and metrics
private connectedPeers: Map<PeerIdStr, PeerData>;
Expand Down Expand Up @@ -174,6 +180,8 @@ export class PeerManager {
this.libp2p.services.components.events.addEventListener(Libp2pEvent.connectionClose, this.onLibp2pPeerDisconnect);
this.networkEventBus.on(NetworkEvent.reqRespRequest, this.onRequest);

this.lastStatus = this.statusCache.get();

// On start-up will connected to existing peers in libp2p.peerStore, same as autoDial behaviour
this.heartbeat();
this.intervals = [
Expand Down Expand Up @@ -342,7 +350,10 @@ export class PeerManager {
private onStatus(peer: PeerId, status: phase0.Status): void {
// reset the to-status timer of this peer
const peerData = this.connectedPeers.get(peer.toString());
if (peerData) peerData.lastStatusUnixTsMs = Date.now();
if (peerData) {
peerData.lastStatusUnixTsMs = Date.now();
peerData.status = status;
}

let isIrrelevant: boolean;
try {
Expand Down Expand Up @@ -450,12 +461,22 @@ export class PeerManager {
}
}

const status = this.statusCache.get();
const starved =
// while syncing progress is happening, we aren't starved
this.lastStatus.headSlot === status.headSlot &&
// if the head falls behind the threshold, we are starved
this.clock.currentSlot - status.headSlot > STARVATION_THRESHOLD_SLOTS;
this.lastStatus = status;
this.metrics?.peerManager.starved.set(starved ? 1 : 0);

const {peersToDisconnect, peersToConnect, attnetQueries, syncnetQueries} = prioritizePeers(
connectedHealthyPeers.map((peer) => {
const peerData = this.connectedPeers.get(peer.toString());
return {
id: peer,
direction: peerData?.direction ?? null,
status: peerData?.status ?? null,
attnets: peerData?.metadata?.attnets ?? null,
syncnets: peerData?.metadata?.syncnets ?? null,
score: this.peerRpcScores.getScore(peer),
Expand All @@ -464,7 +485,13 @@ export class PeerManager {
// Collect subnets which we need peers for in the current slot
this.attnetsService.getActiveSubnets(),
this.syncnetsService.getActiveSubnets(),
this.opts
{
...this.opts,
status,
starved,
starvationPruneRatio: STARVATION_PRUNE_RATIO,
starvationThresholdSlots: STARVATION_THRESHOLD_SLOTS,
}
);

const queriesMerged: SubnetDiscvQueryMs[] = [];
Expand Down Expand Up @@ -598,6 +625,7 @@ export class PeerManager {
relevantStatus: RelevantPeerStatus.Unknown,
direction,
peerId: remotePeer,
status: null,
metadata: null,
agentVersion: null,
agentClient: null,
Expand Down
5 changes: 3 additions & 2 deletions packages/beacon-node/src/network/peers/peersData.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import {PeerId} from "@libp2p/interface";
import {Peer, PeerId} from "@libp2p/interface";
import {Encoding} from "@lodestar/reqresp";
import {altair} from "@lodestar/types";
import {altair, phase0} from "@lodestar/types";
import {ClientKind} from "./client.js";

type PeerIdStr = string;
Expand All @@ -18,6 +18,7 @@ export type PeerData = {
relevantStatus: RelevantPeerStatus;
direction: "inbound" | "outbound";
peerId: PeerId;
status: phase0.Status | null;
metadata: altair.Metadata | null;
agentVersion: string | null;
agentClient: ClientKind | null;
Expand Down
63 changes: 61 additions & 2 deletions packages/beacon-node/src/network/peers/utils/prioritizePeers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,50 @@ const syncnetsZero = BitArray.fromBitLen(SYNC_COMMITTEE_SUBNET_COUNT);

type SubnetDiscvQuery = {subnet: SubnetID; toSlot: number; maxPeersToDiscover: number};

/**
* Comparison of our status vs a peer's status.
*
* The main usage of this score is to feed into peer priorization during syncing, and especially when the node is having trouble finding data during syncing
*
* For network stability, we DON'T distinguish peers that are far behind us vs peers that are close to us.
*/
enum StatusScore {
/** The peer is close to our chain */
CLOSE_TO_US = -1,
/** The peer is far ahead of chain */
FAR_AHEAD = 0,
}

/**
* In practice, this score only tracks if the peer is far ahead of us or not during syncing.
* When the node is synced, the peer is always CLOSE_TO_US.
*/
function computeStatusScore(ours: phase0.Status, theirs: phase0.Status | null, opts: PrioritizePeersOpts): StatusScore {
if (theirs === null) {
return StatusScore.CLOSE_TO_US;
}

if (theirs.finalizedEpoch > ours.finalizedEpoch) {
return StatusScore.FAR_AHEAD;
}

if (theirs.headSlot > ours.headSlot + opts.starvationThresholdSlots) {
return StatusScore.FAR_AHEAD;
}

// It's dangerous to downscore peers that are far behind.
// This means we'd be more likely to disconnect peers that are attempting to sync, which would affect network stability.
// if (ours.headSlot > theirs.headSlot + opts.starvationThresholdSlots) {
// return StatusScore.FAR_BEHIND;
// }

return StatusScore.CLOSE_TO_US;
}

type PeerInfo = {
id: PeerId;
direction: Direction | null;
statusScore: StatusScore;
attnets: phase0.AttestationSubnets;
syncnets: altair.SyncSubnets;
attnetsTrueBitIndices: number[];
Expand All @@ -53,6 +94,10 @@ type PeerInfo = {
export interface PrioritizePeersOpts {
targetPeers: number;
maxPeers: number;
status: phase0.Status;
starved: boolean;
starvationPruneRatio: number;
starvationThresholdSlots: number;
outboundPeersRatio?: number;
targetSubnetPeers?: number;
}
Expand All @@ -67,6 +112,7 @@ export enum ExcessPeerDisconnectReason {
/**
* Prioritize which peers to disconect and which to connect. Conditions:
* - Reach `targetPeers`
* - If we're starved for data, prune additional peers
* - Don't exceed `maxPeers`
* - Ensure there are enough peers per active subnet
* - Prioritize peers with good score
Expand All @@ -75,6 +121,7 @@ export function prioritizePeers(
connectedPeersInfo: {
id: PeerId;
direction: Direction | null;
status: phase0.Status | null;
attnets: phase0.AttestationSubnets | null;
syncnets: altair.SyncSubnets | null;
score: number;
Expand All @@ -98,6 +145,7 @@ export function prioritizePeers(
(peer): PeerInfo => ({
id: peer.id,
direction: peer.direction,
statusScore: computeStatusScore(opts.status, peer.status, opts),
attnets: peer.attnets ?? attnetsZero,
syncnets: peer.syncnets ?? syncnetsZero,
attnetsTrueBitIndices: peer.attnets?.getTrueBitIndexes() ?? [],
Expand Down Expand Up @@ -254,6 +302,11 @@ function pruneExcessPeers(
return false;
}

// Peers far ahead when we're starved for data are not eligible for pruning
if (opts.starved && peer.statusScore === StatusScore.FAR_AHEAD) {
return false;
}

// outbound peers up to OUTBOUND_PEER_RATIO sorted by highest score and not eligible for pruning
if (peer.direction === "outbound") {
if (outboundPeers - outboundPeersEligibleForPruning > outboundPeersTarget) {
Expand All @@ -269,7 +322,9 @@ function pruneExcessPeers(
let peersToDisconnectCount = 0;
const noLongLivedSubnetPeersToDisconnect: PeerId[] = [];

const peersToDisconnectTarget = connectedPeerCount - targetPeers;
const peersToDisconnectTarget =
// if we're starved for data, prune additional peers
connectedPeerCount - targetPeers + (opts.starved ? targetPeers * opts.starvationPruneRatio : 0);

// 1. Lodestar prefers disconnecting peers that does not have long lived subnets
// See https://github.com/ChainSafe/lodestar/issues/3940
Expand Down Expand Up @@ -387,7 +442,7 @@ function pruneExcessPeers(
/**
* Sort peers ascending, peer-0 has the most chance to prune, peer-n has the least.
* Shuffling first to break ties.
* prefer sorting by dutied subnets first then number of long lived subnets,
* prefer sorting by status score (applicable during syncing), then dutied subnets, then number of long lived subnets, then peer score
* peer score is the last criteria since they are supposed to be in the same score range,
* bad score peers are removed by peer manager anyway
*/
Expand All @@ -396,6 +451,10 @@ export function sortPeersToPrune(connectedPeers: PeerInfo[], dutiesByPeer: Map<P
const dutiedSubnet1 = dutiesByPeer.get(p1) ?? 0;
const dutiedSubnet2 = dutiesByPeer.get(p2) ?? 0;
if (dutiedSubnet1 === dutiedSubnet2) {
const statusScore = p1.statusScore - p2.statusScore;
if (statusScore !== 0) {
return statusScore;
}
const [longLivedSubnets1, longLivedSubnets2] = [p1, p2].map(
(p) => p.attnetsTrueBitIndices.length + p.syncnetsTrueBitIndices.length
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,12 @@ describe("data serialization through worker boundary", () => {
state: "connected",
direction: "inbound",
agentVersion: "test",
status: null,
metadata: null,
agentClient: "test",
lastReceivedMsgUnixTsMs: 0,
lastStatusUnixTsMs: 0,
connectedUnixTsMs: 0,
};

// If return type is void, set to null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ import {beforeAll, bench, describe} from "@chainsafe/benchmark";
import {generateKeyPair} from "@libp2p/crypto/keys";
import {PeerId} from "@libp2p/interface";
import {peerIdFromPrivateKey} from "@libp2p/peer-id";
import {ATTESTATION_SUBNET_COUNT, SYNC_COMMITTEE_SUBNET_COUNT} from "@lodestar/params";
import {altair, phase0} from "@lodestar/types";
import {ATTESTATION_SUBNET_COUNT, SLOTS_PER_EPOCH, SYNC_COMMITTEE_SUBNET_COUNT} from "@lodestar/params";
import {altair, phase0, ssz} from "@lodestar/types";
import {defaultNetworkOptions} from "../../../../../src/network/options.js";
import {RequestedSubnet, prioritizePeers} from "../../../../../src/network/peers/utils/index.js";
import {getAttnets, getSyncnets} from "../../../../utils/network.js";
Expand Down Expand Up @@ -103,6 +103,7 @@ describe("prioritizePeers", () => {
Array.from({length: Math.floor(syncnetPercentage * SYNC_COMMITTEE_SUBNET_COUNT)}, (_, i) => i)
),
score: lowestScore + ((highestScore - lowestScore) * i) / defaultNetworkOptions.maxPeers,
status: ssz.phase0.Status.defaultValue(),
}));

const attnets: RequestedSubnet[] = [];
Expand All @@ -117,7 +118,13 @@ describe("prioritizePeers", () => {
return {connectedPeers, attnets, syncnets};
},
fn: ({connectedPeers, attnets, syncnets}) => {
prioritizePeers(connectedPeers, attnets, syncnets, defaultNetworkOptions);
prioritizePeers(connectedPeers, attnets, syncnets, {
...defaultNetworkOptions,
status: ssz.phase0.Status.defaultValue(),
starved: false,
starvationPruneRatio: 0.05,
starvationThresholdSlots: SLOTS_PER_EPOCH * 2,
});
},
});
}
Expand Down
Loading
Loading