Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
f4800a9
refactor missing txs collection
deffrian Feb 6, 2026
bd47ea0
missing txs tracker
deffrian Feb 6, 2026
07a27f0
add test
deffrian Feb 6, 2026
e895363
track txs between requests
deffrian Feb 9, 2026
dee87b0
fromArray
deffrian Feb 9, 2026
dd81ca3
verification inside node tx source
deffrian Feb 9, 2026
c6f1471
fix file store
deffrian Feb 9, 2026
3d0e498
fix build
deffrian Feb 9, 2026
5881f21
fix tests
deffrian Feb 10, 2026
87b2cef
verification in file store tx source
deffrian Feb 10, 2026
a14c9f8
Merge branch 'merge-train/spartan' into nikita/dynamic-missing-txs
deffrian Feb 10, 2026
014b6b3
fix build
deffrian Feb 10, 2026
6682f5a
fix lint
deffrian Feb 10, 2026
4b44406
fix test
deffrian Feb 10, 2026
bf208ba
Merge branch 'merge-train/spartan' into nikita/dynamic-missing-txs
deffrian Feb 11, 2026
73f2bb1
fix build & suggestion
deffrian Feb 11, 2026
5a3074f
fix other suggestions
deffrian Feb 11, 2026
2c85faf
actual fix
deffrian Feb 11, 2026
e9f4dba
Merge branch 'merge-train/spartan' into nikita/dynamic-missing-txs
deffrian Feb 12, 2026
cab041b
fix merge
deffrian Feb 12, 2026
49524a2
Merge branch 'merge-train/spartan' into nikita/dynamic-missing-txs
deffrian Feb 16, 2026
497f35f
suggestions
deffrian Feb 16, 2026
1e81f41
fix
deffrian Feb 16, 2026
0bfa842
fix
deffrian Feb 16, 2026
c43786e
fix inverted flag
deffrian Feb 17, 2026
be842f0
Merge branch 'merge-train/spartan' into nikita/dynamic-missing-txs
deffrian Feb 17, 2026
dcb856c
type
deffrian Feb 17, 2026
c8d1ccc
Merge branch 'merge-train/spartan' into nikita/dynamic-missing-txs
deffrian Feb 17, 2026
b103742
fix build
deffrian Feb 17, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import { BatchTxRequester } from '../../services/reqresp/batch-tx-requester/batc
import type { BatchTxRequesterLibP2PService } from '../../services/reqresp/batch-tx-requester/interface.js';
import type { IBatchRequestTxValidator } from '../../services/reqresp/batch-tx-requester/tx_validator.js';
import type { ConnectionSampler } from '../../services/reqresp/connection-sampler/connection_sampler.js';
import { MissingTxsTracker } from '../../services/tx_collection/missing_txs_tracker.js';
import { generatePeerIdPrivateKeys } from '../../test-helpers/generate-peer-id-private-keys.js';
import { getPorts } from '../../test-helpers/get-ports.js';
import { makeEnrs } from '../../test-helpers/make-enrs.js';
Expand Down Expand Up @@ -229,7 +230,7 @@ describe('p2p client integration batch txs', () => {
mockP2PService.reqResp = (client0 as any).p2pService.reqresp;

const requester = new BatchTxRequester(
missingTxHashes,
MissingTxsTracker.fromArray(missingTxHashes),
blockProposal,
undefined, // no pinned peer
5_000,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,11 @@ import type { PeerId } from '@libp2p/interface';
import { peerIdFromString } from '@libp2p/peer-id';

import type { P2PConfig } from '../../../config.js';
import { BatchTxRequesterCollector, SendBatchRequestCollector } from '../../../services/index.js';
import type { IBatchRequestTxValidator } from '../../../services/reqresp/batch-tx-requester/tx_validator.js';
import { RateLimitStatus } from '../../../services/reqresp/rate-limiter/rate_limiter.js';
import {
BatchTxRequesterCollector,
SendBatchRequestCollector,
} from '../../../services/tx_collection/proposal_tx_collector.js';
import { AlwaysTrueCircuitVerifier } from '../../../test-helpers/reqresp-nodes.js';
import { MissingTxsTracker } from '../../../services/tx_collection/missing_txs_tracker.js';
import { AlwaysTrueCircuitVerifier } from '../../../test-helpers/index.js';
import {
BENCHMARK_CONSTANTS,
InMemoryAttestationPool,
Expand All @@ -31,7 +29,7 @@ import {
calculateInternalTimeout,
createMockEpochCache,
createMockWorldStateSynchronizer,
} from '../../../test-helpers/testbench-utils.js';
} from '../../../test-helpers/index.js';
import { createP2PClient } from '../../index.js';
import type { P2PClient } from '../../p2p_client.js';
import {
Expand Down Expand Up @@ -214,7 +212,13 @@ async function runCollector(cmd: Extract<WorkerCommand, { type: 'RUN_COLLECTOR'
if (collectorType === 'batch-requester') {
const collector = new BatchTxRequesterCollector(p2pService, logger, new DateProvider(), noopTxValidator);
const fetched = await executeTimeout(
(_signal: AbortSignal) => collector.collectTxs(parsedTxHashes, parsedProposal, pinnedPeer, internalTimeoutMs),
(_signal: AbortSignal) =>
collector.collectTxs(
MissingTxsTracker.fromArray(parsedTxHashes),
parsedProposal,
pinnedPeer,
internalTimeoutMs,
),
timeoutMs,
() => new Error(`Collector timed out after ${timeoutMs}ms`),
);
Expand All @@ -226,7 +230,13 @@ async function runCollector(cmd: Extract<WorkerCommand, { type: 'RUN_COLLECTOR'
BENCHMARK_CONSTANTS.FIXED_MAX_RETRY_ATTEMPTS,
);
const fetched = await executeTimeout(
(_signal: AbortSignal) => collector.collectTxs(parsedTxHashes, parsedProposal, pinnedPeer, internalTimeoutMs),
(_signal: AbortSignal) =>
collector.collectTxs(
MissingTxsTracker.fromArray(parsedTxHashes),
parsedProposal,
pinnedPeer,
internalTimeoutMs,
),
timeoutMs,
() => new Error(`Collector timed out after ${timeoutMs}ms`),
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import { describe, expect, it, jest } from '@jest/globals';
import { type MockProxy, mock } from 'jest-mock-extended';

import { createSecp256k1PeerId } from '../../../index.js';
import { MissingTxsTracker } from '../../tx_collection/missing_txs_tracker.js';
import type { ConnectionSampler } from '../connection-sampler/connection_sampler.js';
import type { ReqRespInterface } from '../interface.js';
import { BitVector, BlockTxsRequest, BlockTxsResponse } from '../protocols/index.js';
Expand Down Expand Up @@ -95,7 +96,7 @@ describe('BatchTxRequester', () => {
const clock = new TestClock();

const requester = new BatchTxRequester(
missing,
MissingTxsTracker.fromArray(missing),
blockProposal,
undefined,
deadline,
Expand Down Expand Up @@ -150,7 +151,7 @@ describe('BatchTxRequester', () => {
const clock = new TestClock();

const requester = new BatchTxRequester(
missing,
MissingTxsTracker.fromArray(missing),
blockProposal,
undefined,
deadline,
Expand Down Expand Up @@ -276,7 +277,7 @@ describe('BatchTxRequester', () => {
});

const requester = new BatchTxRequester(
missing,
MissingTxsTracker.fromArray(missing),
blockProposal,
undefined,
deadline,
Expand Down Expand Up @@ -330,7 +331,7 @@ describe('BatchTxRequester', () => {
const semaphore = new TestSemaphore(new Semaphore(0));

const requester = new BatchTxRequester(
missing,
MissingTxsTracker.fromArray(missing),
blockProposal,
undefined,
deadline,
Expand Down Expand Up @@ -382,7 +383,7 @@ describe('BatchTxRequester', () => {

const semaphore = new TestSemaphore(new Semaphore(0));
const requester = new BatchTxRequester(
missing,
MissingTxsTracker.fromArray(missing),
blockProposal,
pinnedPeer,
deadline,
Expand Down Expand Up @@ -449,7 +450,7 @@ describe('BatchTxRequester', () => {

const semaphore = new TestSemaphore(new Semaphore(0));
const requester = new BatchTxRequester(
missing,
MissingTxsTracker.fromArray(missing),
blockProposal,
pinnedPeer,
deadline,
Expand Down Expand Up @@ -505,7 +506,7 @@ describe('BatchTxRequester', () => {

const semaphore = new TestSemaphore(new Semaphore(0));
const requester = new BatchTxRequester(
missing,
MissingTxsTracker.fromArray(missing),
blockProposal,
pinnedPeer,
deadline,
Expand Down Expand Up @@ -561,7 +562,7 @@ describe('BatchTxRequester', () => {
);
reqResp.sendRequestToPeer.mockImplementation(mockImplementation);
const requester = new BatchTxRequester(
missing,
MissingTxsTracker.fromArray(missing),
blockProposal,
pinnedPeer,
deadline,
Expand Down Expand Up @@ -635,7 +636,7 @@ describe('BatchTxRequester', () => {
});

const requester = new BatchTxRequester(
missing,
MissingTxsTracker.fromArray(missing),
blockProposal,
pinnedPeer,
deadline,
Expand Down Expand Up @@ -749,7 +750,7 @@ describe('BatchTxRequester', () => {
});

const requester = new BatchTxRequester(
missing,
MissingTxsTracker.fromArray(missing),
blockProposal,
pinnedPeer,
deadline,
Expand Down Expand Up @@ -885,7 +886,7 @@ describe('BatchTxRequester', () => {
});

const requester = new BatchTxRequester(
missing,
MissingTxsTracker.fromArray(missing),
blockProposal,
pinnedPeer,
deadline,
Expand Down Expand Up @@ -960,7 +961,7 @@ describe('BatchTxRequester', () => {
const clock = new TestClock();

const requester = new BatchTxRequester(
missing,
MissingTxsTracker.fromArray(missing),
blockProposal,
undefined,
shortDeadline,
Expand Down Expand Up @@ -1018,7 +1019,7 @@ describe('BatchTxRequester', () => {
});

const requester = new BatchTxRequester(
missing,
MissingTxsTracker.fromArray(missing),
blockProposal,
undefined,
deadline,
Expand Down Expand Up @@ -1090,7 +1091,7 @@ describe('BatchTxRequester', () => {
});

const requester = new BatchTxRequester(
missing,
MissingTxsTracker.fromArray(missing),
blockProposal,
undefined,
deadline,
Expand Down Expand Up @@ -1143,7 +1144,7 @@ describe('BatchTxRequester', () => {
// Create semaphore that starts with 0 permits to block smart workers
const semaphore = new TestSemaphore(new Semaphore(0));
const requester = new BatchTxRequester(
missing,
MissingTxsTracker.fromArray(missing),
blockProposal,
undefined,
deadline,
Expand Down Expand Up @@ -1217,7 +1218,7 @@ describe('BatchTxRequester', () => {
reqResp.sendRequestToPeer.mockImplementation(mockImplementation);

const requester = new BatchTxRequester(
missing,
MissingTxsTracker.fromArray(missing),
blockProposal,
undefined,
deadline,
Expand Down Expand Up @@ -1298,7 +1299,7 @@ describe('BatchTxRequester', () => {
reqResp.sendRequestToPeer.mockImplementation(mockImplementation);

const requester = new BatchTxRequester(
missing,
MissingTxsTracker.fromArray(missing),
blockProposal,
undefined,
deadline,
Expand Down Expand Up @@ -1369,7 +1370,7 @@ describe('BatchTxRequester', () => {
reqResp.sendRequestToPeer.mockImplementation(mockImplementation);

const requester = new BatchTxRequester(
missing,
MissingTxsTracker.fromArray(missing),
blockProposal,
undefined,
deadline,
Expand Down Expand Up @@ -1401,6 +1402,62 @@ describe('BatchTxRequester', () => {
});
});

describe('External fetching', () => {
it('should not request transactions that were marked as fetched externally', async () => {
const txCount = 16;
const deadline = 5_000;
const missing = Array.from({ length: txCount }, () => TxHash.random());

blockProposal = await makeBlockProposal({
signer: Secp256k1Signer.random(),
blockHeader: makeBlockHeader(1, { blockNumber: BlockNumber(1) }),
archiveRoot: Fr.random(),
txHashes: missing,
});

const peer = await createSecp256k1PeerId();
connectionSampler.getPeerListSortedByConnectionCountAsc.mockReturnValue([peer]);

const tracker = MissingTxsTracker.fromArray(missing);

// Peer has only first half of transactions
const peerTransactions = new Map([[peer.toString(), Array.from({ length: TX_BATCH_SIZE }, (_, i) => i)]]);
const { requestLog, mockImplementation } = createRequestLogger(blockProposal, new Set(), peerTransactions);
reqResp.sendRequestToPeer.mockImplementation(mockImplementation);

// Create requester first
const requester = new BatchTxRequester(
tracker,
blockProposal,
undefined,
deadline,
mockP2PService,
logger,
new DateProvider(),
{
smartParallelWorkerCount: 0,
dumbParallelWorkerCount: 1,
txValidator,
},
);

// Mark transactions 8-15 as fetched externally after creating requester
for (let i = TX_BATCH_SIZE; i < txCount; i++) {
tracker.markFetched(makeTx(missing[i]));
}

// Run and collect results
const result = await BatchTxRequester.collectAllTxs(requester.run());

// Verify only transactions 0-7 were requested (indices 8-15 were marked fetched)
const allRequestedIndices = requestLog.get(peer.toString())?.flatMap(r => r.indices) || [];
const requestedExternallyFetched = allRequestedIndices.filter(idx => idx >= TX_BATCH_SIZE);

expect(requestedExternallyFetched).toEqual([]);
expect(result.length).toBe(TX_BATCH_SIZE);
});
});

describe('Pinned peer functionality', () => {
it('Should query pinned peer if available', async () => {
const txCount = 10;
Expand Down Expand Up @@ -1430,7 +1487,7 @@ describe('BatchTxRequester', () => {
reqResp.sendRequestToPeer.mockImplementation(mockImplementation);

const requester = new BatchTxRequester(
missing,
MissingTxsTracker.fromArray(missing),
blockProposal,
pinnedPeer,
deadline,
Expand Down Expand Up @@ -1480,7 +1537,7 @@ describe('BatchTxRequester', () => {
reqResp.sendRequestToPeer.mockImplementation(mockImplementation);

const requester = new BatchTxRequester(
missing,
MissingTxsTracker.fromArray(missing),
blockProposal,
pinnedPeer,
deadline,
Expand Down Expand Up @@ -1561,7 +1618,7 @@ describe('BatchTxRequester', () => {
});

const requester = new BatchTxRequester(
missing,
MissingTxsTracker.fromArray(missing),
blockProposal,
pinnedPeer,
deadline,
Expand Down Expand Up @@ -1615,7 +1672,7 @@ describe('BatchTxRequester', () => {
reqResp.sendRequestToPeer.mockImplementation(mockImplementation);

const requester = new BatchTxRequester(
missing,
MissingTxsTracker.fromArray(missing),
blockProposal,
pinnedPeer,
deadline,
Expand Down Expand Up @@ -1671,7 +1728,7 @@ describe('BatchTxRequester', () => {
reqResp.sendRequestToPeer.mockImplementation(mockImplementation);

const requester = new BatchTxRequester(
missing,
MissingTxsTracker.fromArray(missing),
blockProposal,
pinnedPeer,
deadline,
Expand Down
Loading
Loading