Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import { BatchTxRequester } from '../../services/reqresp/batch-tx-requester/batc
import type { BatchTxRequesterLibP2PService } from '../../services/reqresp/batch-tx-requester/interface.js';
import type { IBatchRequestTxValidator } from '../../services/reqresp/batch-tx-requester/tx_validator.js';
import type { ConnectionSampler } from '../../services/reqresp/connection-sampler/connection_sampler.js';
import { MissingTxsTracker } from '../../services/tx_collection/missing_txs_tracker.js';
import { RequestTracker } from '../../services/tx_collection/request_tracker.js';
import { generatePeerIdPrivateKeys } from '../../test-helpers/generate-peer-id-private-keys.js';
import { getPorts } from '../../test-helpers/get-ports.js';
import { makeEnrs } from '../../test-helpers/make-enrs.js';
Expand Down Expand Up @@ -231,10 +231,9 @@ describe('p2p client integration batch txs', () => {
mockP2PService.reqResp = (client0 as any).p2pService.reqresp;

const requester = new BatchTxRequester(
MissingTxsTracker.fromArray(missingTxHashes),
RequestTracker.create(missingTxHashes, new Date(Date.now() + 5_000)),
blockProposal,
undefined, // no pinned peer
5_000,
mockP2PService,
logger,
undefined,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import type { P2PConfig } from '../../../config.js';
import { BatchTxRequesterCollector, SendBatchRequestCollector } from '../../../services/index.js';
import type { IBatchRequestTxValidator } from '../../../services/reqresp/batch-tx-requester/tx_validator.js';
import { RateLimitStatus } from '../../../services/reqresp/rate-limiter/rate_limiter.js';
import { MissingTxsTracker } from '../../../services/tx_collection/missing_txs_tracker.js';
import { RequestTracker } from '../../../services/tx_collection/request_tracker.js';
import {
AlwaysTrueCircuitVerifier,
BENCHMARK_CONSTANTS,
Expand Down Expand Up @@ -213,10 +213,9 @@ async function runCollector(cmd: Extract<WorkerCommand, { type: 'RUN_COLLECTOR'
const fetched = await executeTimeout(
(_signal: AbortSignal) =>
collector.collectTxs(
MissingTxsTracker.fromArray(parsedTxHashes),
RequestTracker.create(parsedTxHashes, new Date(Date.now() + internalTimeoutMs)),
parsedProposal,
pinnedPeer,
internalTimeoutMs,
),
timeoutMs,
() => new Error(`Collector timed out after ${timeoutMs}ms`),
Expand All @@ -231,10 +230,9 @@ async function runCollector(cmd: Extract<WorkerCommand, { type: 'RUN_COLLECTOR'
const fetched = await executeTimeout(
(_signal: AbortSignal) =>
collector.collectTxs(
MissingTxsTracker.fromArray(parsedTxHashes),
RequestTracker.create(parsedTxHashes, new Date(Date.now() + internalTimeoutMs)),
parsedProposal,
pinnedPeer,
internalTimeoutMs,
),
timeoutMs,
() => new Error(`Collector timed out after ${timeoutMs}ms`),
Expand Down
53 changes: 46 additions & 7 deletions yarn-project/p2p/src/services/reqresp/batch-tx-requester/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,37 @@ class BlockTxsResponse {

The `BitVector` is a compact representation where each bit corresponds to a transaction index in the block proposal. This allows efficient capability advertisement without repeating full hashes.

## Cancellation

All cancellation is managed by a single `RequestTracker` instance, shared across the entire collection
flow. The `RequestTracker` owns the deadline, tracks which txs are still missing, and exposes a
`cancellationToken` promise that resolves when the request should stop (deadline hit, all txs fetched,
or external `cancel()` call).

Cancellation propagates from the deepest stack level upward:

```
RequestTracker.finish()
├── resolves cancellationToken promise
├── BatchTxRequester workers (deepest)
│ ├── shouldStop() checks requestTracker.cancelled → exit loop
│ ├── sleepClampedToDeadline races sleep vs cancellationToken → wakes
│ └── semaphore.acquire races vs cancellationToken → wakes
│ │
│ ▼ workers settle → txQueue.end() → generator returns
├── Node collection loops
│ ├── notFinished() checks requestTracker.cancelled → exit loop
│ └── inter-retry sleep races vs cancellationToken → wakes
│ │
│ ▼ all node loops settle
└── collectFast (outermost)
awaits Promise.allSettled([reqresp, nodes]) → settles after inner tasks
finally: requestTracker.cancel() (idempotent), cleanup
```

## Key Files

| File | Description |
Expand All @@ -179,15 +210,16 @@ The `BitVector` is a compact representation where each bit corresponds to a tran
| `peer_collection.ts` | Manages peer classification (dumb/smart/bad) and rate limiting |
| `interface.ts` | Type definitions for dependencies |
| `../protocols/block_txs/` | Wire protocol definitions (`BlockTxsRequest`, `BlockTxsResponse`, `BitVector`) |
| `../../tx_collection/request_tracker.ts` | Centralized deadline, missing tx tracking, and cancellation signal |

## Stopping Conditions

The `BatchTxRequester` stops when any of these conditions are met:
The `BatchTxRequester` stops when any of these conditions are met, all managed by the `RequestTracker`:

1. **All transactions fetched** - Success!
2. **Deadline exceeded** - Timeout configured by caller
3. **Abort signal** - External cancellation
4. **No transactions to fetch** - Nothing was missing
1. **All transactions fetched** - `markFetched()` removes the last missing tx, triggering `finish()`
2. **Deadline exceeded** - `setTimeout` in `RequestTracker` fires, triggering `finish()`
3. **External cancellation** - `RequestTracker.cancel()` called (e.g., from `stop()`, `stopCollectingForBlocksUpTo`)
4. **No transactions to fetch** - Empty hash set at construction, `RequestTracker` finishes immediately

## Configuration

Expand Down Expand Up @@ -228,11 +260,15 @@ Request to peer fails
## Usage Example

```typescript
const requestTracker = RequestTracker.create(
missingTxHashes, // TxHash[] - what we need
new Date(Date.now() + 5_000), // deadline
);

const requester = new BatchTxRequester(
missingTxHashes, // TxHash[] - what we need
requestTracker, // IRequestTracker - tracks missing txs, deadline, and cancellation
blockTxsSource, // BlockTxsSource - the proposal or block we need txs for
pinnedPeer, // PeerId | undefined - peer expected to have the txs
timeoutMs, // number - how long to try
p2pService, // BatchTxRequesterLibP2PService
);

Expand Down Expand Up @@ -273,6 +309,8 @@ const txs = await BatchTxRequester.collectAllTxs(requester.run());
│ 1. Try RPC nodes first (fast) │ │ Periodic polling of RPC nodes │
│ 2. Fall back to BatchTxRequester │ │ and peers for missing txs │
│ │ │ │
│ Creates RequestTracker per │ │ │
│ request with deadline │ │ │
└───────────────────┬───────────────┘ └─────────────────────────────────────┘
│ For 'proposal' and 'block' requests
Expand All @@ -281,6 +319,7 @@ const txs = await BatchTxRequester.collectAllTxs(requester.run());
│ BatchTxRequester │
│ │
│ Aggressive parallel fetching from multiple peers │
│ Shares RequestTracker with FastTxCollection for unified cancellation │
│ Uses BLOCK_TXS sub-protocol for efficient batching │
└───────────────────┬─────────────────────────────────────────────────────────┘
Expand Down
Loading
Loading